AMBARI-20758 Aggregate local metrics for minute aggregation time window (dsen)
Project: http://git-wip-us.apache.org/repos/asf/ambari/repo Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/041d353b Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/041d353b Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/041d353b Branch: refs/heads/branch-feature-AMBARI-20859 Commit: 041d353b0d75b20b0322097e13a1701226e6fc97 Parents: 772be78 Author: Dmytro Sen <[email protected]> Authored: Wed May 17 19:38:29 2017 +0300 Committer: Dmytro Sen <[email protected]> Committed: Wed May 17 19:38:29 2017 +0300 ---------------------------------------------------------------------- .../logfeeder/metrics/LogFeederAMSClient.java | 12 +- ambari-metrics/ambari-metrics-assembly/pom.xml | 20 +++ .../src/main/assembly/monitor-windows.xml | 7 + .../src/main/assembly/monitor.xml | 9 +- .../timeline/AbstractTimelineMetricsSink.java | 24 ++- .../sink/timeline/AggregationResult.java | 60 +++++++ .../metrics2/sink/timeline/MetricAggregate.java | 110 ++++++++++++ .../sink/timeline/MetricClusterAggregate.java | 73 ++++++++ .../sink/timeline/MetricHostAggregate.java | 81 +++++++++ .../metrics2/sink/timeline/TimelineMetric.java | 6 +- .../TimelineMetricWithAggregatedValues.java | 65 +++++++ .../AbstractTimelineMetricSinkTest.java | 10 ++ .../availability/MetricCollectorHATest.java | 10 ++ .../cache/HandleConnectExceptionTest.java | 10 ++ .../sink/flume/FlumeTimelineMetricsSink.java | 16 ++ .../timeline/HadoopTimelineMetricsSink.java | 20 ++- .../conf/unix/log4j.properties | 31 ++++ .../conf/windows/log4j.properties | 29 +++ .../ambari-metrics-host-aggregator/pom.xml | 120 +++++++++++++ .../AbstractMetricPublisherThread.java | 134 ++++++++++++++ .../aggregator/AggregatedMetricsPublisher.java | 101 +++++++++++ .../host/aggregator/AggregatorApplication.java | 180 +++++++++++++++++++ .../host/aggregator/AggregatorWebService.java | 56 ++++++ .../host/aggregator/RawMetricsPublisher.java | 60 +++++++ .../host/aggregator/TimelineMetricsHolder.java | 98 ++++++++++ .../conf/unix/ambari-metrics-monitor | 2 +- .../src/main/python/core/aggregator.py | 110 ++++++++++++ .../src/main/python/core/config_reader.py | 35 +++- .../src/main/python/core/controller.py | 28 +++ .../src/main/python/core/emitter.py | 8 +- .../src/main/python/core/stop_handler.py | 3 +- .../src/main/python/main.py | 6 +- .../kafka/KafkaTimelineMetricsReporter.java | 17 ++ .../storm/StormTimelineMetricsReporter.java | 14 ++ .../sink/storm/StormTimelineMetricsSink.java | 14 ++ .../storm/StormTimelineMetricsReporter.java | 16 ++ .../sink/storm/StormTimelineMetricsSink.java | 16 ++ .../timeline/HBaseTimelineMetricStore.java | 29 ++- .../metrics/timeline/PhoenixHBaseAccessor.java | 4 +- .../timeline/TimelineMetricConfiguration.java | 2 + .../metrics/timeline/TimelineMetricStore.java | 2 + .../timeline/TimelineMetricsAggregatorSink.java | 4 +- .../timeline/aggregators/MetricAggregate.java | 110 ------------ .../aggregators/MetricClusterAggregate.java | 73 -------- .../aggregators/MetricHostAggregate.java | 81 --------- .../TimelineMetricAppAggregator.java | 1 + .../TimelineMetricClusterAggregator.java | 2 + .../TimelineMetricClusterAggregatorSecond.java | 1 + .../TimelineMetricHostAggregator.java | 1 + .../aggregators/TimelineMetricReadHelper.java | 2 + .../webapp/TimelineWebServices.java | 31 ++++ .../timeline/ITPhoenixHBaseAccessor.java | 4 +- .../metrics/timeline/MetricTestHelper.java | 2 +- .../timeline/PhoenixHBaseAccessorTest.java | 4 +- .../timeline/TestMetricHostAggregate.java | 8 +- .../timeline/TestTimelineMetricStore.java | 6 + .../TimelineMetricsAggregatorMemorySink.java | 4 +- .../aggregators/ITClusterAggregator.java | 4 +- .../aggregators/ITMetricAggregator.java | 13 +- ...melineMetricClusterAggregatorSecondTest.java | 1 + ambari-metrics/pom.xml | 1 + .../system/impl/AmbariMetricSinkImpl.java | 10 ++ .../1.6.1.2.2.0/package/scripts/params.py | 2 + .../hadoop-metrics2-accumulo.properties.j2 | 3 + .../0.1.0/configuration/ams-env.xml | 8 + .../0.1.0/configuration/ams-site.xml | 11 ++ .../AMBARI_METRICS/0.1.0/metainfo.xml | 3 + .../AMBARI_METRICS/0.1.0/package/scripts/ams.py | 30 ++++ .../0.1.0/package/scripts/params.py | 5 + .../hadoop-metrics2-hbase.properties.j2 | 3 + .../package/templates/metric_monitor.ini.j2 | 7 + .../FLUME/1.4.0.2.0/package/scripts/params.py | 3 + .../templates/flume-metrics2.properties.j2 | 2 + .../0.96.0.2.0/package/scripts/params_linux.py | 3 + ...-metrics2-hbase.properties-GANGLIA-MASTER.j2 | 2 + ...doop-metrics2-hbase.properties-GANGLIA-RS.j2 | 2 + .../hadoop-metrics2.properties.xml | 2 + .../0.12.0.2.0/package/scripts/params_linux.py | 2 + .../hadoop-metrics2-hivemetastore.properties.j2 | 2 + .../hadoop-metrics2-hiveserver2.properties.j2 | 2 + .../templates/hadoop-metrics2-llapdaemon.j2 | 2 + .../hadoop-metrics2-llaptaskscheduler.j2 | 2 + .../2.1.0.3.0/package/scripts/params_linux.py | 3 + .../hadoop-metrics2-hivemetastore.properties.j2 | 2 + .../hadoop-metrics2-hiveserver2.properties.j2 | 2 + .../templates/hadoop-metrics2-llapdaemon.j2 | 2 + .../hadoop-metrics2-llaptaskscheduler.j2 | 2 + .../KAFKA/0.8.1/configuration/kafka-broker.xml | 11 ++ .../KAFKA/0.8.1/package/scripts/params.py | 3 + .../STORM/0.9.1/package/scripts/params_linux.py | 2 + .../0.9.1/package/templates/config.yaml.j2 | 2 + .../templates/storm-metrics2.properties.j2 | 2 + .../2.0.6/hooks/before-START/scripts/params.py | 3 + .../templates/hadoop-metrics2.properties.j2 | 2 + .../hadoop-metrics2.properties.xml | 2 + .../3.0/hooks/before-START/scripts/params.py | 2 + .../templates/hadoop-metrics2.properties.j2 | 2 + .../system/impl/TestAmbariMetricsSinkImpl.java | 10 ++ .../2.0/hooks/before-START/scripts/params.py | 2 + 99 files changed, 1854 insertions(+), 307 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ambari/blob/041d353b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/metrics/LogFeederAMSClient.java ---------------------------------------------------------------------- diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/metrics/LogFeederAMSClient.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/metrics/LogFeederAMSClient.java index 2d1bf40..39526a5 100644 --- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/metrics/LogFeederAMSClient.java +++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/metrics/LogFeederAMSClient.java @@ -89,6 +89,16 @@ public class LogFeederAMSClient extends AbstractTimelineMetricsSink { } @Override + protected boolean isHostInMemoryAggregationEnabled() { + return false; + } + + @Override + protected int getHostInMemoryAggregationPort() { + return 0; + } + + @Override protected boolean emitMetrics(TimelineMetrics metrics) { return super.emitMetrics(metrics); } @@ -103,4 +113,4 @@ public class LogFeederAMSClient extends AbstractTimelineMetricsSink { return collectorPort; } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/ambari/blob/041d353b/ambari-metrics/ambari-metrics-assembly/pom.xml ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-assembly/pom.xml b/ambari-metrics/ambari-metrics-assembly/pom.xml index a4b87de..6b81de5 100644 --- a/ambari-metrics/ambari-metrics-assembly/pom.xml +++ b/ambari-metrics/ambari-metrics-assembly/pom.xml @@ -35,6 +35,7 @@ <properties> <collector.dir>${project.basedir}/../ambari-metrics-timelineservice</collector.dir> <monitor.dir>${project.basedir}/../ambari-metrics-host-monitoring</monitor.dir> + <aggregator.dir>${project.basedir}/../ambari-metrics-host-aggregator</aggregator.dir> <grafana.dir>${project.basedir}/../ambari-metrics-grafana</grafana.dir> <hadoop-sink.dir>${project.basedir}/../ambari-metrics-hadoop-sink</hadoop-sink.dir> <storm-sink.dir>${project.basedir}/../ambari-metrics-storm-sink</storm-sink.dir> @@ -599,6 +600,19 @@ </sources> </mapping> <mapping> + <directory>/var/lib/ambari-metrics-monitor/lib</directory> + <sources> + <source> + <location> + ${aggregator.dir}/target/ + </location> + <includes> + <include>ambari-metrics-host-aggregator-${project.version}.jar</include> + </includes> + </source> + </sources> + </mapping> + <mapping> <directory>/etc/ambari-metrics-monitor/conf</directory> <configuration>true</configuration> </mapping> @@ -744,6 +758,7 @@ <path>/var/run/ambari-metrics-grafana</path> <path>/var/log/ambari-metrics-grafana</path> <path>/var/lib/ambari-metrics-collector</path> + <path>/var/lib/ambari-metrics-monitor/lib</path> <path>/var/lib/ambari-metrics-grafana</path> <path>/usr/lib/ambari-metrics-hadoop-sink</path> <path>/usr/lib/ambari-metrics-kafka-sink</path> @@ -1331,6 +1346,11 @@ <type>pom</type> <optional>true</optional> </dependency> + <dependency> + <groupId>org.apache.ambari</groupId> + <artifactId>ambari-metrics-host-aggregator</artifactId> + <version>${project.version}</version> + </dependency> </dependencies> http://git-wip-us.apache.org/repos/asf/ambari/blob/041d353b/ambari-metrics/ambari-metrics-assembly/src/main/assembly/monitor-windows.xml ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-assembly/src/main/assembly/monitor-windows.xml b/ambari-metrics/ambari-metrics-assembly/src/main/assembly/monitor-windows.xml index ab309a1..d015d31 100644 --- a/ambari-metrics/ambari-metrics-assembly/src/main/assembly/monitor-windows.xml +++ b/ambari-metrics/ambari-metrics-assembly/src/main/assembly/monitor-windows.xml @@ -64,6 +64,13 @@ </includes> </fileSet> <fileSet> + <directory>${aggregator.dir}/conf/windows</directory> + <outputDirectory>conf</outputDirectory> + <includes> + <include>log4j.properties</include> + </includes> + </fileSet> + <fileSet> <directory>${monitor.dir}/conf/windows</directory> <outputDirectory>/</outputDirectory> <includes> http://git-wip-us.apache.org/repos/asf/ambari/blob/041d353b/ambari-metrics/ambari-metrics-assembly/src/main/assembly/monitor.xml ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-assembly/src/main/assembly/monitor.xml b/ambari-metrics/ambari-metrics-assembly/src/main/assembly/monitor.xml index 99a41c3..448fe62 100644 --- a/ambari-metrics/ambari-metrics-assembly/src/main/assembly/monitor.xml +++ b/ambari-metrics/ambari-metrics-assembly/src/main/assembly/monitor.xml @@ -46,6 +46,13 @@ </includes> </fileSet> <fileSet> + <directory>${aggregator.dir}/conf/unix</directory> + <outputDirectory>conf</outputDirectory> + <includes> + <include>log4j.properties</include> + </includes> + </fileSet> + <fileSet> <directory>${monitor.dir}/conf/unix</directory> <outputDirectory>bin</outputDirectory> <includes> @@ -68,4 +75,4 @@ -</assembly> \ No newline at end of file +</assembly> http://git-wip-us.apache.org/repos/asf/ambari/blob/041d353b/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/AbstractTimelineMetricsSink.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/AbstractTimelineMetricsSink.java b/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/AbstractTimelineMetricsSink.java index 2c6fae2..a8dc571 100644 --- a/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/AbstractTimelineMetricsSink.java +++ b/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/AbstractTimelineMetricsSink.java @@ -78,6 +78,8 @@ public abstract class AbstractTimelineMetricsSink { public static final String SSL_KEYSTORE_PATH_PROPERTY = "truststore.path"; public static final String SSL_KEYSTORE_TYPE_PROPERTY = "truststore.type"; public static final String SSL_KEYSTORE_PASSWORD_PROPERTY = "truststore.password"; + public static final String HOST_IN_MEMORY_AGGREGATION_ENABLED_PROPERTY = "host_in_memory_aggregation"; + public static final String HOST_IN_MEMORY_AGGREGATION_PORT_PROPERTY = "host_in_memory_aggregation_port"; public static final String COLLECTOR_LIVE_NODES_PATH = "/ws/v1/timeline/metrics/livenodes"; public static final String INSTANCE_ID_PROPERTY = "instanceId"; public static final String SET_INSTANCE_ID_PROPERTY = "set.instanceId"; @@ -241,8 +243,14 @@ public abstract class AbstractTimelineMetricsSink { } protected boolean emitMetrics(TimelineMetrics metrics) { - String collectorHost = getCurrentCollectorHost(); - String connectUrl = getCollectorUri(collectorHost); + String connectUrl; + if (isHostInMemoryAggregationEnabled()) { + connectUrl = constructTimelineMetricUri("http", "localhost", String.valueOf(getHostInMemoryAggregationPort())); + } else { + String collectorHost = getCurrentCollectorHost(); + connectUrl = getCollectorUri(collectorHost); + } + String jsonData = null; LOG.debug("EmitMetrics connectUrl = " + connectUrl); try { @@ -562,4 +570,16 @@ public abstract class AbstractTimelineMetricsSink { * @return String "host1" */ abstract protected String getHostname(); + + /** + * Check if host in-memory aggregation is enabled + * @return + */ + abstract protected boolean isHostInMemoryAggregationEnabled(); + + /** + * In memory aggregation port + * @return + */ + abstract protected int getHostInMemoryAggregationPort(); } http://git-wip-us.apache.org/repos/asf/ambari/blob/041d353b/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/AggregationResult.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/AggregationResult.java b/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/AggregationResult.java new file mode 100644 index 0000000..c903e3d --- /dev/null +++ b/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/AggregationResult.java @@ -0,0 +1,60 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.metrics2.sink.timeline; + +import javax.xml.bind.annotation.XmlElement; +import javax.xml.bind.annotation.XmlRootElement; +import java.util.Set; + +@XmlRootElement(name="AggregationResult") +public class AggregationResult { + protected Set<TimelineMetricWithAggregatedValues> result; + protected Long timeInMilis; + + @Override + public String toString() { + return "AggregationResult{" + + "result=" + result + + ", timeInMilis=" + timeInMilis + + '}'; + } + + public AggregationResult() { + } + + public AggregationResult(Set<TimelineMetricWithAggregatedValues> result, Long timeInMilis) { + this.result = result; + this.timeInMilis = timeInMilis; + } + @XmlElement + public Set<TimelineMetricWithAggregatedValues> getResult() { + return result; + } + + public void setResult(Set<TimelineMetricWithAggregatedValues> result) { + this.result = result; + } + @XmlElement + public Long getTimeInMilis() { + return timeInMilis; + } + + public void setTimeInMilis(Long timeInMilis) { + this.timeInMilis = timeInMilis; + } +} http://git-wip-us.apache.org/repos/asf/ambari/blob/041d353b/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/MetricAggregate.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/MetricAggregate.java b/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/MetricAggregate.java new file mode 100644 index 0000000..84cba0e --- /dev/null +++ b/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/MetricAggregate.java @@ -0,0 +1,110 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.metrics2.sink.timeline; + + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.codehaus.jackson.annotate.JsonProperty; +import org.codehaus.jackson.annotate.JsonSubTypes; +import org.codehaus.jackson.map.ObjectMapper; + +import java.io.IOException; + +/** +* +*/ +@JsonSubTypes({@JsonSubTypes.Type(value = MetricClusterAggregate.class), + @JsonSubTypes.Type(value = MetricHostAggregate.class)}) [email protected] [email protected] +public class MetricAggregate { + private static final ObjectMapper mapper = new ObjectMapper(); + + protected Double sum = 0.0; + protected Double deviation; + protected Double max = Double.MIN_VALUE; + protected Double min = Double.MAX_VALUE; + + public MetricAggregate() { + } + + MetricAggregate(Double sum, Double deviation, Double max, + Double min) { + this.sum = sum; + this.deviation = deviation; + this.max = max; + this.min = min; + } + + public void updateSum(Double sum) { + this.sum += sum; + } + + public void updateMax(Double max) { + if (max > this.max) { + this.max = max; + } + } + + public void updateMin(Double min) { + if (min < this.min) { + this.min = min; + } + } + + @JsonProperty("sum") + public Double getSum() { + return sum; + } + + @JsonProperty("deviation") + public Double getDeviation() { + return deviation; + } + + @JsonProperty("max") + public Double getMax() { + return max; + } + + @JsonProperty("min") + public Double getMin() { + return min; + } + + public void setSum(Double sum) { + this.sum = sum; + } + + public void setDeviation(Double deviation) { + this.deviation = deviation; + } + + public void setMax(Double max) { + this.max = max; + } + + public void setMin(Double min) { + this.min = min; + } + + public String toJSON() throws IOException { + return mapper.writeValueAsString(this); + } +} http://git-wip-us.apache.org/repos/asf/ambari/blob/041d353b/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/MetricClusterAggregate.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/MetricClusterAggregate.java b/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/MetricClusterAggregate.java new file mode 100644 index 0000000..7ef2c1d --- /dev/null +++ b/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/MetricClusterAggregate.java @@ -0,0 +1,73 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.metrics2.sink.timeline; + + +import org.codehaus.jackson.annotate.JsonCreator; +import org.codehaus.jackson.annotate.JsonProperty; + +/** +* +*/ +public class MetricClusterAggregate extends MetricAggregate { + private int numberOfHosts; + + @JsonCreator + public MetricClusterAggregate() { + } + + public MetricClusterAggregate(Double sum, int numberOfHosts, Double deviation, + Double max, Double min) { + super(sum, deviation, max, min); + this.numberOfHosts = numberOfHosts; + } + + @JsonProperty("numberOfHosts") + public int getNumberOfHosts() { + return numberOfHosts; + } + + public void updateNumberOfHosts(int count) { + this.numberOfHosts += count; + } + + public void setNumberOfHosts(int numberOfHosts) { + this.numberOfHosts = numberOfHosts; + } + + /** + * Find and update min, max and avg for a minute + */ + public void updateAggregates(MetricClusterAggregate hostAggregate) { + updateMax(hostAggregate.getMax()); + updateMin(hostAggregate.getMin()); + updateSum(hostAggregate.getSum()); + updateNumberOfHosts(hostAggregate.getNumberOfHosts()); + } + + @Override + public String toString() { + return "MetricAggregate{" + + "sum=" + sum + + ", numberOfHosts=" + numberOfHosts + + ", deviation=" + deviation + + ", max=" + max + + ", min=" + min + + '}'; + } +} http://git-wip-us.apache.org/repos/asf/ambari/blob/041d353b/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/MetricHostAggregate.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/MetricHostAggregate.java b/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/MetricHostAggregate.java new file mode 100644 index 0000000..e190913 --- /dev/null +++ b/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/MetricHostAggregate.java @@ -0,0 +1,81 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.metrics2.sink.timeline; + + +import org.codehaus.jackson.annotate.JsonCreator; +import org.codehaus.jackson.annotate.JsonProperty; + +/** + * Represents a collection of minute based aggregation of values for + * resolution greater than a minute. + */ +public class MetricHostAggregate extends MetricAggregate { + + private long numberOfSamples = 0; + + @JsonCreator + public MetricHostAggregate() { + super(0.0, 0.0, Double.MIN_VALUE, Double.MAX_VALUE); + } + + public MetricHostAggregate(Double sum, int numberOfSamples, + Double deviation, + Double max, Double min) { + super(sum, deviation, max, min); + this.numberOfSamples = numberOfSamples; + } + + @JsonProperty("numberOfSamples") + public long getNumberOfSamples() { + return numberOfSamples == 0 ? 1 : numberOfSamples; + } + + public void updateNumberOfSamples(long count) { + this.numberOfSamples += count; + } + + public void setNumberOfSamples(long numberOfSamples) { + this.numberOfSamples = numberOfSamples; + } + + public double calculateAverage() { + return sum / numberOfSamples; + } + + /** + * Find and update min, max and avg for a minute + */ + public void updateAggregates(MetricHostAggregate hostAggregate) { + updateMax(hostAggregate.getMax()); + updateMin(hostAggregate.getMin()); + updateSum(hostAggregate.getSum()); + updateNumberOfSamples(hostAggregate.getNumberOfSamples()); + } + + @Override + public String toString() { + return "MetricHostAggregate{" + + "sum=" + sum + + ", numberOfSamples=" + numberOfSamples + + ", deviation=" + deviation + + ", max=" + max + + ", min=" + min + + '}'; + } +} http://git-wip-us.apache.org/repos/asf/ambari/blob/041d353b/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/TimelineMetric.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/TimelineMetric.java b/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/TimelineMetric.java index 44c9d4a..edace52 100644 --- a/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/TimelineMetric.java +++ b/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/TimelineMetric.java @@ -45,7 +45,7 @@ public class TimelineMetric implements Comparable<TimelineMetric> { private String type; private String units; private TreeMap<Long, Double> metricValues = new TreeMap<Long, Double>(); - private Map<String, String> metadata = new HashMap<>(); + private HashMap<String, String> metadata = new HashMap<>(); // default public TimelineMetric() { @@ -151,11 +151,11 @@ public class TimelineMetric implements Comparable<TimelineMetric> { } @XmlElement(name = "metadata") - public Map<String,String> getMetadata () { + public HashMap<String,String> getMetadata () { return metadata; } - public void setMetadata (Map<String,String> metadata) { + public void setMetadata (HashMap<String,String> metadata) { this.metadata = metadata; } http://git-wip-us.apache.org/repos/asf/ambari/blob/041d353b/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/TimelineMetricWithAggregatedValues.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/TimelineMetricWithAggregatedValues.java b/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/TimelineMetricWithAggregatedValues.java new file mode 100644 index 0000000..626ac5f --- /dev/null +++ b/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/TimelineMetricWithAggregatedValues.java @@ -0,0 +1,65 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.metrics2.sink.timeline; + + +import javax.xml.bind.annotation.XmlAccessType; +import javax.xml.bind.annotation.XmlAccessorType; +import javax.xml.bind.annotation.XmlElement; +import javax.xml.bind.annotation.XmlRootElement; + + +@XmlRootElement(name = "TimelineMetricWithAggregatedValues") +@XmlAccessorType(XmlAccessType.NONE) +public class TimelineMetricWithAggregatedValues { + private TimelineMetric timelineMetric; + private MetricHostAggregate metricAggregate; + + public TimelineMetricWithAggregatedValues() { + } + + public TimelineMetricWithAggregatedValues(TimelineMetric metric, MetricHostAggregate metricAggregate) { + timelineMetric = metric; + this.metricAggregate = metricAggregate; + } + + @XmlElement + public MetricHostAggregate getMetricAggregate() { + return metricAggregate; + } + @XmlElement + public TimelineMetric getTimelineMetric() { + return timelineMetric; + } + + public void setTimelineMetric(TimelineMetric timelineMetric) { + this.timelineMetric = timelineMetric; + } + + public void setMetricAggregate(MetricHostAggregate metricAggregate) { + this.metricAggregate = metricAggregate; + } + + @Override + public String toString() { + return "TimelineMetricWithAggregatedValues{" + + "timelineMetric=" + timelineMetric + + ", metricAggregate=" + metricAggregate + + '}'; + } +} http://git-wip-us.apache.org/repos/asf/ambari/blob/041d353b/ambari-metrics/ambari-metrics-common/src/test/java/org/apache/hadoop/metrics2/sink/timeline/availability/AbstractTimelineMetricSinkTest.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-common/src/test/java/org/apache/hadoop/metrics2/sink/timeline/availability/AbstractTimelineMetricSinkTest.java b/ambari-metrics/ambari-metrics-common/src/test/java/org/apache/hadoop/metrics2/sink/timeline/availability/AbstractTimelineMetricSinkTest.java index 9b0cdbe..ce2cf79 100644 --- a/ambari-metrics/ambari-metrics-common/src/test/java/org/apache/hadoop/metrics2/sink/timeline/availability/AbstractTimelineMetricSinkTest.java +++ b/ambari-metrics/ambari-metrics-common/src/test/java/org/apache/hadoop/metrics2/sink/timeline/availability/AbstractTimelineMetricSinkTest.java @@ -90,6 +90,16 @@ public class AbstractTimelineMetricSinkTest { } @Override + protected boolean isHostInMemoryAggregationEnabled() { + return true; + } + + @Override + protected int getHostInMemoryAggregationPort() { + return 61888; + } + + @Override public boolean emitMetrics(TimelineMetrics metrics) { super.init(); return super.emitMetrics(metrics); http://git-wip-us.apache.org/repos/asf/ambari/blob/041d353b/ambari-metrics/ambari-metrics-common/src/test/java/org/apache/hadoop/metrics2/sink/timeline/availability/MetricCollectorHATest.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-common/src/test/java/org/apache/hadoop/metrics2/sink/timeline/availability/MetricCollectorHATest.java b/ambari-metrics/ambari-metrics-common/src/test/java/org/apache/hadoop/metrics2/sink/timeline/availability/MetricCollectorHATest.java index a393a96..f0174d5 100644 --- a/ambari-metrics/ambari-metrics-common/src/test/java/org/apache/hadoop/metrics2/sink/timeline/availability/MetricCollectorHATest.java +++ b/ambari-metrics/ambari-metrics-common/src/test/java/org/apache/hadoop/metrics2/sink/timeline/availability/MetricCollectorHATest.java @@ -192,5 +192,15 @@ public class MetricCollectorHATest { protected String getHostname() { return "h1"; } + + @Override + protected boolean isHostInMemoryAggregationEnabled() { + return true; + } + + @Override + protected int getHostInMemoryAggregationPort() { + return 61888; + } } } http://git-wip-us.apache.org/repos/asf/ambari/blob/041d353b/ambari-metrics/ambari-metrics-common/src/test/java/org/apache/hadoop/metrics2/sink/timeline/cache/HandleConnectExceptionTest.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-common/src/test/java/org/apache/hadoop/metrics2/sink/timeline/cache/HandleConnectExceptionTest.java b/ambari-metrics/ambari-metrics-common/src/test/java/org/apache/hadoop/metrics2/sink/timeline/cache/HandleConnectExceptionTest.java index 32fe32e..4eb75eb 100644 --- a/ambari-metrics/ambari-metrics-common/src/test/java/org/apache/hadoop/metrics2/sink/timeline/cache/HandleConnectExceptionTest.java +++ b/ambari-metrics/ambari-metrics-common/src/test/java/org/apache/hadoop/metrics2/sink/timeline/cache/HandleConnectExceptionTest.java @@ -125,6 +125,16 @@ public class HandleConnectExceptionTest { } @Override + protected boolean isHostInMemoryAggregationEnabled() { + return false; + } + + @Override + protected int getHostInMemoryAggregationPort() { + return 61888; + } + + @Override public boolean emitMetrics(TimelineMetrics metrics) { super.init(); return super.emitMetrics(metrics); http://git-wip-us.apache.org/repos/asf/ambari/blob/041d353b/ambari-metrics/ambari-metrics-flume-sink/src/main/java/org/apache/hadoop/metrics2/sink/flume/FlumeTimelineMetricsSink.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-flume-sink/src/main/java/org/apache/hadoop/metrics2/sink/flume/FlumeTimelineMetricsSink.java b/ambari-metrics/ambari-metrics-flume-sink/src/main/java/org/apache/hadoop/metrics2/sink/flume/FlumeTimelineMetricsSink.java index 904c916..6277907 100644 --- a/ambari-metrics/ambari-metrics-flume-sink/src/main/java/org/apache/hadoop/metrics2/sink/flume/FlumeTimelineMetricsSink.java +++ b/ambari-metrics/ambari-metrics-flume-sink/src/main/java/org/apache/hadoop/metrics2/sink/flume/FlumeTimelineMetricsSink.java @@ -63,6 +63,9 @@ public class FlumeTimelineMetricsSink extends AbstractTimelineMetricsSink implem private int timeoutSeconds = 10; private boolean setInstanceId; private String instanceId; + private boolean hostInMemoryAggregationEnabled; + private int hostInMemoryAggregationPort; + @Override public void start() { @@ -110,6 +113,9 @@ public class FlumeTimelineMetricsSink extends AbstractTimelineMetricsSink implem port = configuration.getProperty(COLLECTOR_PORT, "6188"); setInstanceId = Boolean.valueOf(configuration.getProperty(SET_INSTANCE_ID_PROPERTY, "false")); instanceId = configuration.getProperty(INSTANCE_ID_PROPERTY, ""); + + hostInMemoryAggregationEnabled = Boolean.getBoolean(configuration.getProperty(HOST_IN_MEMORY_AGGREGATION_ENABLED_PROPERTY)); + hostInMemoryAggregationPort = Integer.valueOf(configuration.getProperty(HOST_IN_MEMORY_AGGREGATION_PORT_PROPERTY)); // Initialize the collector write strategy super.init(); @@ -162,6 +168,16 @@ public class FlumeTimelineMetricsSink extends AbstractTimelineMetricsSink implem return hostname; } + @Override + protected boolean isHostInMemoryAggregationEnabled() { + return hostInMemoryAggregationEnabled; + } + + @Override + protected int getHostInMemoryAggregationPort() { + return hostInMemoryAggregationPort; + } + public void setPollFrequency(long pollFrequency) { this.pollFrequency = pollFrequency; } http://git-wip-us.apache.org/repos/asf/ambari/blob/041d353b/ambari-metrics/ambari-metrics-hadoop-sink/src/main/java/org/apache/hadoop/metrics2/sink/timeline/HadoopTimelineMetricsSink.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-hadoop-sink/src/main/java/org/apache/hadoop/metrics2/sink/timeline/HadoopTimelineMetricsSink.java b/ambari-metrics/ambari-metrics-hadoop-sink/src/main/java/org/apache/hadoop/metrics2/sink/timeline/HadoopTimelineMetricsSink.java index 11e16c2..c235c7c 100644 --- a/ambari-metrics/ambari-metrics-hadoop-sink/src/main/java/org/apache/hadoop/metrics2/sink/timeline/HadoopTimelineMetricsSink.java +++ b/ambari-metrics/ambari-metrics-hadoop-sink/src/main/java/org/apache/hadoop/metrics2/sink/timeline/HadoopTimelineMetricsSink.java @@ -75,6 +75,8 @@ public class HadoopTimelineMetricsSink extends AbstractTimelineMetricsSink imple return t; } }); + private int hostInMemoryAggregationPort; + private boolean hostInMemoryAggregationEnabled; @Override public void init(SubsetConfiguration conf) { @@ -107,7 +109,8 @@ public class HadoopTimelineMetricsSink extends AbstractTimelineMetricsSink imple protocol = conf.getString(COLLECTOR_PROTOCOL, "http"); collectorHosts = parseHostsStringArrayIntoCollection(conf.getStringArray(COLLECTOR_HOSTS_PROPERTY)); port = conf.getString(COLLECTOR_PORT, "6188"); - + hostInMemoryAggregationEnabled = conf.getBoolean(HOST_IN_MEMORY_AGGREGATION_ENABLED_PROPERTY); + hostInMemoryAggregationPort = conf.getInt(HOST_IN_MEMORY_AGGREGATION_PORT_PROPERTY); if (collectorHosts.isEmpty()) { LOG.error("No Metric collector configured."); } else { @@ -249,6 +252,16 @@ public class HadoopTimelineMetricsSink extends AbstractTimelineMetricsSink imple } @Override + protected boolean isHostInMemoryAggregationEnabled() { + return hostInMemoryAggregationEnabled; + } + + @Override + protected int getHostInMemoryAggregationPort() { + return hostInMemoryAggregationPort; + } + + @Override public void putMetrics(MetricsRecord record) { try { String recordName = record.name(); @@ -308,9 +321,10 @@ public class HadoopTimelineMetricsSink extends AbstractTimelineMetricsSink imple int sbBaseLen = sb.length(); List<TimelineMetric> metricList = new ArrayList<TimelineMetric>(); - Map<String, String> metadata = null; + HashMap<String, String> metadata = null; if (skipAggregation) { - metadata = Collections.singletonMap("skipAggregation", "true"); + metadata = new HashMap<>(); + metadata.put("skipAggregation", "true"); } long startTime = record.timestamp(); http://git-wip-us.apache.org/repos/asf/ambari/blob/041d353b/ambari-metrics/ambari-metrics-host-aggregator/conf/unix/log4j.properties ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-host-aggregator/conf/unix/log4j.properties b/ambari-metrics/ambari-metrics-host-aggregator/conf/unix/log4j.properties new file mode 100644 index 0000000..d7ceedd --- /dev/null +++ b/ambari-metrics/ambari-metrics-host-aggregator/conf/unix/log4j.properties @@ -0,0 +1,31 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +# Define some default values that can be overridden by system properties +# Root logger option +log4j.rootLogger=INFO,file + +# Direct log messages to a log file +log4j.appender.file=org.apache.log4j.RollingFileAppender +log4j.appender.file.File=/var/log/ambari-metrics-monitor/ambari-metrics-aggregator.log +log4j.appender.file.MaxFileSize=80MB +log4j.appender.file.MaxBackupIndex=60 +log4j.appender.file.layout=org.apache.log4j.PatternLayout +log4j.appender.file.layout.ConversionPattern=%d{ABSOLUTE} %5p [%t] %c{1}:%L - %m%n + + http://git-wip-us.apache.org/repos/asf/ambari/blob/041d353b/ambari-metrics/ambari-metrics-host-aggregator/conf/windows/log4j.properties ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-host-aggregator/conf/windows/log4j.properties b/ambari-metrics/ambari-metrics-host-aggregator/conf/windows/log4j.properties new file mode 100644 index 0000000..d9aabab --- /dev/null +++ b/ambari-metrics/ambari-metrics-host-aggregator/conf/windows/log4j.properties @@ -0,0 +1,29 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +# Define some default values that can be overridden by system properties +# Root logger option +log4j.rootLogger=INFO,file + +# Direct log messages to a log file +log4j.appender.file=org.apache.log4j.RollingFileAppender +log4j.appender.file.File=\\var\\log\\ambari-metrics-monitor\\ambari-metrics-aggregator.log +log4j.appender.file.MaxFileSize=80MB +log4j.appender.file.MaxBackupIndex=60 +log4j.appender.file.layout=org.apache.log4j.PatternLayout +log4j.appender.file.layout.ConversionPattern=%d{ABSOLUTE} %5p [%t] %c{1}:%L - %m%n http://git-wip-us.apache.org/repos/asf/ambari/blob/041d353b/ambari-metrics/ambari-metrics-host-aggregator/pom.xml ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-host-aggregator/pom.xml b/ambari-metrics/ambari-metrics-host-aggregator/pom.xml new file mode 100644 index 0000000..c2c7897 --- /dev/null +++ b/ambari-metrics/ambari-metrics-host-aggregator/pom.xml @@ -0,0 +1,120 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + ~ Licensed to the Apache Software Foundation (ASF) under one + ~ or more contributor license agreements. See the NOTICE file + ~ distributed with this work for additional information + ~ regarding copyright ownership. The ASF licenses this file + ~ to you under the Apache License, Version 2.0 (the + ~ "License"); you may not use this file except in compliance + ~ with the License. You may obtain a copy of the License at + ~ + ~ http://www.apache.org/licenses/LICENSE-2.0 + ~ + ~ Unless required by applicable law or agreed to in writing, software + ~ distributed under the License is distributed on an "AS IS" BASIS, + ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + ~ See the License for the specific language governing permissions and + ~ limitations under the License. + --> + +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <parent> + <artifactId>ambari-metrics</artifactId> + <groupId>org.apache.ambari</groupId> + <version>2.0.0.0-SNAPSHOT</version> + </parent> + <modelVersion>4.0.0</modelVersion> + + <artifactId>ambari-metrics-host-aggregator</artifactId> + <packaging>jar</packaging> + + <name>ambari-metrics-host-aggregator</name> + <url>http://maven.apache.org</url> + + <properties> + <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> + </properties> + + <dependencies> + <dependency> + <groupId>junit</groupId> + <artifactId>junit</artifactId> + <version>3.8.1</version> + <scope>test</scope> + </dependency> + <dependency> + <groupId>com.google.guava</groupId> + <artifactId>guava</artifactId> + <version>14.0.1</version> + </dependency> + <dependency> + <groupId>org.apache.ambari</groupId> + <artifactId>ambari-metrics-common</artifactId> + <version>2.0.0.0-SNAPSHOT</version> + </dependency> + <dependency> + <groupId>javax.servlet</groupId> + <artifactId>servlet-api</artifactId> + <version>2.5</version> + </dependency> + <dependency> + <groupId>com.sun.jersey</groupId> + <artifactId>jersey-json</artifactId> + <version>1.11</version> + </dependency> + <dependency> + <groupId>com.sun.jersey</groupId> + <artifactId>jersey-server</artifactId> + <version>1.11</version> + </dependency> + <dependency> + <groupId>javax.xml.bind</groupId> + <artifactId>jaxb-api</artifactId> + <version>2.2.2</version> + </dependency> + <dependency> + <groupId>com.sun.jersey</groupId> + <artifactId>jersey-core</artifactId> + <version>1.11</version> + </dependency> + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-common</artifactId> + <version>2.7.1.2.3.4.0-3347</version> + </dependency> + </dependencies> + + <build> + <plugins> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-shade-plugin</artifactId> + <version>1.6</version> + <configuration> + <createDependencyReducedPom>true</createDependencyReducedPom> + <filters> + <filter> + <artifact>*:*</artifact> + <excludes> + <exclude>META-INF/*.SF</exclude> + <exclude>META-INF/*.DSA</exclude> + <exclude>META-INF/*.RSA</exclude> + </excludes> + </filter> + </filters> + </configuration> + + <executions> + <execution> + <phase>package</phase> + <goals> + <goal>shade</goal> + </goals> + </execution> + </executions> + </plugin> + </plugins> + </build> + +</project> http://git-wip-us.apache.org/repos/asf/ambari/blob/041d353b/ambari-metrics/ambari-metrics-host-aggregator/src/main/java/org/apache/hadoop/metrics2/host/aggregator/AbstractMetricPublisherThread.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-host-aggregator/src/main/java/org/apache/hadoop/metrics2/host/aggregator/AbstractMetricPublisherThread.java b/ambari-metrics/ambari-metrics-host-aggregator/src/main/java/org/apache/hadoop/metrics2/host/aggregator/AbstractMetricPublisherThread.java new file mode 100644 index 0000000..b1f60fa --- /dev/null +++ b/ambari-metrics/ambari-metrics-host-aggregator/src/main/java/org/apache/hadoop/metrics2/host/aggregator/AbstractMetricPublisherThread.java @@ -0,0 +1,134 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.metrics2.host.aggregator; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics; +import org.codehaus.jackson.map.AnnotationIntrospector; +import org.codehaus.jackson.map.ObjectMapper; +import org.codehaus.jackson.map.annotate.JsonSerialize; +import org.codehaus.jackson.xc.JaxbAnnotationIntrospector; + +import java.io.IOException; +import java.io.OutputStream; +import java.net.HttpURLConnection; +import java.net.URL; +import java.util.Map; + +/** + * Abstract class that runs a thread that publishes metrics data to AMS collector in specified intervals. + */ +public abstract class AbstractMetricPublisherThread extends Thread { + protected int publishIntervalInSeconds; + protected String publishURL; + protected ObjectMapper objectMapper; + private Log LOG; + protected TimelineMetricsHolder timelineMetricsHolder; + + public AbstractMetricPublisherThread(TimelineMetricsHolder timelineMetricsHolder, String publishURL, int publishIntervalInSeconds) { + LOG = LogFactory.getLog(this.getClass()); + this.publishURL = publishURL; + this.publishIntervalInSeconds = publishIntervalInSeconds; + this.timelineMetricsHolder = timelineMetricsHolder; + objectMapper = new ObjectMapper(); + AnnotationIntrospector introspector = new JaxbAnnotationIntrospector(); + objectMapper.setAnnotationIntrospector(introspector); + objectMapper.getSerializationConfig() + .withSerializationInclusion(JsonSerialize.Inclusion.NON_NULL); + } + + /** + * Publishes metrics to collector in specified intervals while not interrupted. + */ + @Override + public void run() { + while (!isInterrupted()) { + try { + sleep(this.publishIntervalInSeconds * 1000); + } catch (InterruptedException e) { + //Ignore + } + try { + processAndPublishMetrics(getMetricsFromCache()); + } catch (Exception e) { + LOG.error("Couldn't process and send metrics : ",e); + } + } + } + + /** + * Processes and sends metrics to collector. + * @param metricsFromCache + * @throws Exception + */ + protected void processAndPublishMetrics(Map<Long, TimelineMetrics> metricsFromCache) throws Exception { + if (metricsFromCache.size()==0) return; + + LOG.info(String.format("Preparing %s timeline metrics for publishing", metricsFromCache.size())); + publishMetricsJson(processMetrics(metricsFromCache)); + } + + /** + * Returns metrics map. Source is based on implementation. + * @return + */ + protected abstract Map<Long,TimelineMetrics> getMetricsFromCache(); + + /** + * Processes given metrics (aggregates or merges them) and converts them into json string that will be send to collector + * @param metricValues + * @return + */ + protected abstract String processMetrics(Map<Long, TimelineMetrics> metricValues); + + protected void publishMetricsJson(String jsonData) throws Exception { + int timeout = 5 * 1000; + HttpURLConnection connection = null; + if (this.publishURL == null) { + throw new IOException("Unknown URL. Unable to connect to metrics collector."); + } + LOG.info("Collector URL : " + publishURL); + connection = (HttpURLConnection) new URL(this.publishURL).openConnection(); + + connection.setRequestMethod("POST"); + connection.setRequestProperty("Content-Type", "application/json"); + connection.setRequestProperty("Connection", "Keep-Alive"); + connection.setConnectTimeout(timeout); + connection.setReadTimeout(timeout); + connection.setDoOutput(true); + + if (jsonData != null) { + try (OutputStream os = connection.getOutputStream()) { + os.write(jsonData.getBytes("UTF-8")); + } + } + int responseCode = connection.getResponseCode(); + if (responseCode != 200) { + throw new Exception("responseCode is " + responseCode); + } + LOG.info("Successfully sent metrics."); + } + + /** + * Interrupts the thread. + */ + protected void stopPublisher() { + this.interrupt(); + } +} http://git-wip-us.apache.org/repos/asf/ambari/blob/041d353b/ambari-metrics/ambari-metrics-host-aggregator/src/main/java/org/apache/hadoop/metrics2/host/aggregator/AggregatedMetricsPublisher.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-host-aggregator/src/main/java/org/apache/hadoop/metrics2/host/aggregator/AggregatedMetricsPublisher.java b/ambari-metrics/ambari-metrics-host-aggregator/src/main/java/org/apache/hadoop/metrics2/host/aggregator/AggregatedMetricsPublisher.java new file mode 100644 index 0000000..0540ec9 --- /dev/null +++ b/ambari-metrics/ambari-metrics-host-aggregator/src/main/java/org/apache/hadoop/metrics2/host/aggregator/AggregatedMetricsPublisher.java @@ -0,0 +1,101 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.metrics2.host.aggregator; + + + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.metrics2.sink.timeline.AggregationResult; +import org.apache.hadoop.metrics2.sink.timeline.MetricHostAggregate; +import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric; +import org.apache.hadoop.metrics2.sink.timeline.TimelineMetricWithAggregatedValues; +import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics; + +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; +import java.util.TreeMap; + +/** + * Thread that aggregates and publishes metrics to collector on specified interval. + */ +public class AggregatedMetricsPublisher extends AbstractMetricPublisherThread { + + private Log LOG; + + public AggregatedMetricsPublisher(TimelineMetricsHolder timelineMetricsHolder, String collectorURL, int interval) { + super(timelineMetricsHolder, collectorURL, interval); + LOG = LogFactory.getLog(this.getClass()); + } + + /** + * get metrics map form @TimelineMetricsHolder + * @return + */ + @Override + protected Map<Long, TimelineMetrics> getMetricsFromCache() { + return timelineMetricsHolder.extractMetricsForAggregationPublishing(); + } + + /** + * Aggregates given metrics and converts them into json string that will be send to collector + * @param metricForAggregationValues + * @return + */ + @Override + protected String processMetrics(Map<Long, TimelineMetrics> metricForAggregationValues) { + HashMap<String, TimelineMetrics> nameToMetricMap = new HashMap<>(); + for (TimelineMetrics timelineMetrics : metricForAggregationValues.values()) { + for (TimelineMetric timelineMetric : timelineMetrics.getMetrics()) { + if (!nameToMetricMap.containsKey(timelineMetric.getMetricName())) { + nameToMetricMap.put(timelineMetric.getMetricName(), new TimelineMetrics()); + } + nameToMetricMap.get(timelineMetric.getMetricName()).addOrMergeTimelineMetric(timelineMetric); + } + } + Set<TimelineMetricWithAggregatedValues> metricAggregateMap = new HashSet<>(); + for (TimelineMetrics metrics : nameToMetricMap.values()) { + double sum = 0; + double max = Integer.MIN_VALUE; + double min = Integer.MAX_VALUE; + int count = 0; + for (TimelineMetric metric : metrics.getMetrics()) { + for (Double value : metric.getMetricValues().values()) { + sum+=value; + max = Math.max(max, value); + min = Math.min(min, value); + count++; + } + } + TimelineMetric tmpMetric = new TimelineMetric(metrics.getMetrics().get(0)); + tmpMetric.setMetricValues(new TreeMap<Long, Double>()); + metricAggregateMap.add(new TimelineMetricWithAggregatedValues(tmpMetric, new MetricHostAggregate(sum, count, 0d, max, min))); + } + String json = null; + try { + json = objectMapper.writeValueAsString(new AggregationResult(metricAggregateMap, System.currentTimeMillis())); + LOG.debug(json); + } catch (Exception e) { + LOG.error("Failed to convert result into json", e); + } + + return json; + } +} http://git-wip-us.apache.org/repos/asf/ambari/blob/041d353b/ambari-metrics/ambari-metrics-host-aggregator/src/main/java/org/apache/hadoop/metrics2/host/aggregator/AggregatorApplication.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-host-aggregator/src/main/java/org/apache/hadoop/metrics2/host/aggregator/AggregatorApplication.java b/ambari-metrics/ambari-metrics-host-aggregator/src/main/java/org/apache/hadoop/metrics2/host/aggregator/AggregatorApplication.java new file mode 100644 index 0000000..c6b703b --- /dev/null +++ b/ambari-metrics/ambari-metrics-host-aggregator/src/main/java/org/apache/hadoop/metrics2/host/aggregator/AggregatorApplication.java @@ -0,0 +1,180 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.metrics2.host.aggregator; + +import com.sun.jersey.api.container.httpserver.HttpServerFactory; +import com.sun.jersey.api.core.PackagesResourceConfig; +import com.sun.jersey.api.core.ResourceConfig; +import com.sun.net.httpserver.HttpServer; + +import javax.ws.rs.core.UriBuilder; +import java.io.IOException; +import java.net.InetAddress; +import java.net.URI; +import java.net.URL; +import java.net.UnknownHostException; +import java.util.HashMap; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; + +/** + * WEB application with 2 publisher threads that processes received metrics and submits results to the collector + */ +public class AggregatorApplication +{ + private static final int STOP_SECONDS_DELAY = 0; + private static final int JOIN_SECONDS_TIMEOUT = 2; + private static String BASE_POST_URL = "%s://%s:%s/ws/v1/timeline/metrics"; + private static String AGGREGATED_POST_PREFIX = "/aggregated"; + private static final String METRICS_SITE_CONFIGURATION_FILE = "ams-site.xml"; + private static Log LOG = LogFactory.getLog("AggregatorApplication.class"); + private final int webApplicationPort; + private final int rawPublishingInterval; + private final int aggregationInterval; + private Configuration configuration; + private String [] collectorHosts; + private AggregatedMetricsPublisher aggregatePublisher; + private RawMetricsPublisher rawPublisher; + private TimelineMetricsHolder timelineMetricsHolder; + private HttpServer httpServer; + + public AggregatorApplication(String collectorHosts) { + initConfiguration(); + this.collectorHosts = collectorHosts.split(","); + this.aggregationInterval = configuration.getInt("timeline.metrics.host.aggregator.minute.interval", 300); + this.rawPublishingInterval = configuration.getInt("timeline.metrics.sink.report.interval", 60); + this.webApplicationPort = configuration.getInt("timeline.metrics.host.inmemory.aggregation.port", 61888); + this.timelineMetricsHolder = TimelineMetricsHolder.getInstance(rawPublishingInterval, aggregationInterval); + try { + this.httpServer = createHttpServer(); + } catch (IOException e) { + LOG.error("Exception while starting HTTP server. Exiting", e); + System.exit(1); + } + } + + private void initConfiguration() { + ClassLoader classLoader = Thread.currentThread().getContextClassLoader(); + if (classLoader == null) { + classLoader = getClass().getClassLoader(); + } + + URL amsResUrl = classLoader.getResource(METRICS_SITE_CONFIGURATION_FILE); + LOG.info("Found metric service configuration: " + amsResUrl); + if (amsResUrl == null) { + throw new IllegalStateException("Unable to initialize the metrics " + + "subsystem. No ams-site present in the classpath."); + } + configuration = new Configuration(true); + try { + configuration.addResource(amsResUrl.toURI().toURL()); + } catch (Exception e) { + LOG.error("Couldn't init configuration. ", e); + System.exit(1); + } + } + + private String getHostName() { + String hostName = "localhost"; + try { + hostName = InetAddress.getLocalHost().getCanonicalHostName(); + } catch (UnknownHostException e) { + LOG.error(e); + } + return hostName; + } + + private URI getURI() { + URI uri = UriBuilder.fromUri("http://" + getHostName() + "/").port(this.webApplicationPort).build(); + LOG.info(String.format("Web server at %s", uri)); + return uri; + } + + private HttpServer createHttpServer() throws IOException { + ResourceConfig resourceConfig = new PackagesResourceConfig("org.apache.hadoop.metrics2.host.aggregator"); + HashMap<String, Object> params = new HashMap(); + params.put("com.sun.jersey.api.json.POJOMappingFeature", "true"); + resourceConfig.setPropertiesAndFeatures(params); + return HttpServerFactory.create(getURI(), resourceConfig); + } + + private void startWebServer() { + LOG.info("Starting web server."); + this.httpServer.start(); + } + + private void startAggregatePublisherThread() { + LOG.info("Starting aggregated metrics publisher."); + String collectorURL = buildBasicCollectorURL(collectorHosts[0]) + AGGREGATED_POST_PREFIX; + aggregatePublisher = new AggregatedMetricsPublisher(timelineMetricsHolder, collectorURL, aggregationInterval); + aggregatePublisher.start(); + } + + private void startRawPublisherThread() { + LOG.info("Starting raw metrics publisher."); + String collectorURL = buildBasicCollectorURL(collectorHosts[0]); + rawPublisher = new RawMetricsPublisher(timelineMetricsHolder, collectorURL, rawPublishingInterval); + rawPublisher.start(); + } + + + + private void stop() { + aggregatePublisher.stopPublisher(); + rawPublisher.stopPublisher(); + httpServer.stop(STOP_SECONDS_DELAY); + LOG.info("Stopped web server."); + try { + LOG.info("Waiting for threads to join."); + aggregatePublisher.join(JOIN_SECONDS_TIMEOUT * 1000); + rawPublisher.join(JOIN_SECONDS_TIMEOUT * 1000); + LOG.info("Gracefully stopped Aggregator Application."); + } catch (InterruptedException e) { + LOG.error("Received exception during stop : ", e); + + } + + } + + private String buildBasicCollectorURL(String host) { + String port = configuration.get("timeline.metrics.service.webapp.address", "0.0.0.0:6188").split(":")[1]; + String protocol = configuration.get("timeline.metrics.service.http.policy", "HTTP_ONLY").equalsIgnoreCase("HTTP_ONLY") ? "http" : "https"; + return String.format(BASE_POST_URL, protocol, host, port); + } + + public static void main( String[] args ) throws Exception { + LOG.info("Starting aggregator application"); + if (args.length != 1) { + throw new Exception("This jar should be run with 1 argument - collector hosts separated with coma"); + } + + final AggregatorApplication app = new AggregatorApplication(args[0]); + app.startAggregatePublisherThread(); + app.startRawPublisherThread(); + app.startWebServer(); + + Runtime.getRuntime().addShutdownHook(new Thread() { + public void run() { + LOG.info("Stopping aggregator application"); + app.stop(); + } + }); + } +} http://git-wip-us.apache.org/repos/asf/ambari/blob/041d353b/ambari-metrics/ambari-metrics-host-aggregator/src/main/java/org/apache/hadoop/metrics2/host/aggregator/AggregatorWebService.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-host-aggregator/src/main/java/org/apache/hadoop/metrics2/host/aggregator/AggregatorWebService.java b/ambari-metrics/ambari-metrics-host-aggregator/src/main/java/org/apache/hadoop/metrics2/host/aggregator/AggregatorWebService.java new file mode 100644 index 0000000..f96d0ed --- /dev/null +++ b/ambari-metrics/ambari-metrics-host-aggregator/src/main/java/org/apache/hadoop/metrics2/host/aggregator/AggregatorWebService.java @@ -0,0 +1,56 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.metrics2.host.aggregator; + + + +import com.sun.jersey.spi.resource.Singleton; +import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics; + +import javax.ws.rs.Consumes; +import javax.ws.rs.GET; +import javax.ws.rs.POST; +import javax.ws.rs.Path; +import javax.ws.rs.Produces; +import javax.ws.rs.core.MediaType; +import javax.ws.rs.core.Response; +import java.io.IOException; + +@Singleton +@Path("/ws/v1/timeline") +public class AggregatorWebService { + TimelineMetricsHolder metricsHolder = TimelineMetricsHolder.getInstance(); + + @GET + @Produces("text/json") + @Path("/metrics") + public Response helloWorld() throws IOException { + return Response.ok().build(); + } + + @POST + @Produces(MediaType.TEXT_PLAIN) + @Consumes({ MediaType.APPLICATION_JSON /* , MediaType.APPLICATION_XML */}) + @Path("/metrics") + public Response postMetrics( + TimelineMetrics metrics) { + metricsHolder.putMetricsForAggregationPublishing(metrics); + metricsHolder.putMetricsForRawPublishing(metrics); + return Response.ok().build(); + } +} http://git-wip-us.apache.org/repos/asf/ambari/blob/041d353b/ambari-metrics/ambari-metrics-host-aggregator/src/main/java/org/apache/hadoop/metrics2/host/aggregator/RawMetricsPublisher.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-host-aggregator/src/main/java/org/apache/hadoop/metrics2/host/aggregator/RawMetricsPublisher.java b/ambari-metrics/ambari-metrics-host-aggregator/src/main/java/org/apache/hadoop/metrics2/host/aggregator/RawMetricsPublisher.java new file mode 100644 index 0000000..f317ed9 --- /dev/null +++ b/ambari-metrics/ambari-metrics-host-aggregator/src/main/java/org/apache/hadoop/metrics2/host/aggregator/RawMetricsPublisher.java @@ -0,0 +1,60 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.metrics2.host.aggregator; + + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric; +import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics; + +import java.util.Map; + +public class RawMetricsPublisher extends AbstractMetricPublisherThread { + private final Log LOG; + + public RawMetricsPublisher(TimelineMetricsHolder timelineMetricsHolder, String collectorURL, int interval) { + super(timelineMetricsHolder, collectorURL, interval); + LOG = LogFactory.getLog(this.getClass()); + } + + + @Override + protected Map<Long, TimelineMetrics> getMetricsFromCache() { + return timelineMetricsHolder.extractMetricsForRawPublishing(); + } + + @Override + protected String processMetrics(Map<Long, TimelineMetrics> metricValues) { + //merge everything in one TimelineMetrics object + TimelineMetrics timelineMetrics = new TimelineMetrics(); + for (TimelineMetrics metrics : metricValues.values()) { + for (TimelineMetric timelineMetric : metrics.getMetrics()) + timelineMetrics.addOrMergeTimelineMetric(timelineMetric); + } + //map TimelineMetrics to json string + String json = null; + try { + json = objectMapper.writeValueAsString(timelineMetrics); + LOG.debug(json); + } catch (Exception e) { + LOG.error("Failed to convert result into json", e); + } + return json; + } +} http://git-wip-us.apache.org/repos/asf/ambari/blob/041d353b/ambari-metrics/ambari-metrics-host-aggregator/src/main/java/org/apache/hadoop/metrics2/host/aggregator/TimelineMetricsHolder.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-host-aggregator/src/main/java/org/apache/hadoop/metrics2/host/aggregator/TimelineMetricsHolder.java b/ambari-metrics/ambari-metrics-host-aggregator/src/main/java/org/apache/hadoop/metrics2/host/aggregator/TimelineMetricsHolder.java new file mode 100644 index 0000000..b355c97 --- /dev/null +++ b/ambari-metrics/ambari-metrics-host-aggregator/src/main/java/org/apache/hadoop/metrics2/host/aggregator/TimelineMetricsHolder.java @@ -0,0 +1,98 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.metrics2.host.aggregator; + +import com.google.common.cache.Cache; +import com.google.common.cache.CacheBuilder; +import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics; + +import java.util.Map; +import java.util.TreeMap; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; + +/** + * Singleton class with 2 guava caches for raw and aggregated metrics storing + */ +public class TimelineMetricsHolder { + private static final int DEFAULT_RAW_CACHE_EXPIRE_TIME = 60; + private static final int DEFAULT_AGGREGATION_CACHE_EXPIRE_TIME = 300; + private Cache<Long, TimelineMetrics> aggregationMetricsCache; + private Cache<Long, TimelineMetrics> rawMetricsCache; + private static TimelineMetricsHolder instance = null; + //to ensure no metric values are expired + private static int EXPIRE_DELAY = 30; + ReadWriteLock aggregationCacheLock = new ReentrantReadWriteLock(); + ReadWriteLock rawCacheLock = new ReentrantReadWriteLock(); + + private TimelineMetricsHolder(int rawCacheExpireTime, int aggregationCacheExpireTime) { + this.rawMetricsCache = CacheBuilder.newBuilder().expireAfterWrite(rawCacheExpireTime + EXPIRE_DELAY, TimeUnit.SECONDS).build(); + this.aggregationMetricsCache = CacheBuilder.newBuilder().expireAfterWrite(aggregationCacheExpireTime + EXPIRE_DELAY, TimeUnit.SECONDS).build(); + } + + public static TimelineMetricsHolder getInstance(int rawCacheExpireTime, int aggregationCacheExpireTime) { + if (instance == null) { + instance = new TimelineMetricsHolder(rawCacheExpireTime, aggregationCacheExpireTime); + } + return instance; + } + + /** + * Uses default expiration time for caches initialization if they are not initialized yet. + * @return + */ + public static TimelineMetricsHolder getInstance() { + return getInstance(DEFAULT_RAW_CACHE_EXPIRE_TIME, DEFAULT_AGGREGATION_CACHE_EXPIRE_TIME); + } + + public void putMetricsForAggregationPublishing(TimelineMetrics timelineMetrics) { + aggregationCacheLock.writeLock().lock(); + aggregationMetricsCache.put(System.currentTimeMillis(), timelineMetrics); + aggregationCacheLock.writeLock().unlock(); + } + + public Map<Long, TimelineMetrics> extractMetricsForAggregationPublishing() { + return extractMetricsFromCacheWithLock(aggregationMetricsCache, aggregationCacheLock); + } + + public void putMetricsForRawPublishing(TimelineMetrics metrics) { + rawCacheLock.writeLock().lock(); + rawMetricsCache.put(System.currentTimeMillis(), metrics); + rawCacheLock.writeLock().unlock(); + } + + public Map<Long, TimelineMetrics> extractMetricsForRawPublishing() { + return extractMetricsFromCacheWithLock(rawMetricsCache, rawCacheLock); + } + + /** + * Returns values from cache and clears the cache + * @param cache + * @param lock + * @return + */ + private Map<Long, TimelineMetrics> extractMetricsFromCacheWithLock(Cache<Long, TimelineMetrics> cache, ReadWriteLock lock) { + lock.writeLock().lock(); + Map<Long, TimelineMetrics> metricsMap = new TreeMap<>(cache.asMap()); + cache.invalidateAll(); + lock.writeLock().unlock(); + return metricsMap; + } + +} http://git-wip-us.apache.org/repos/asf/ambari/blob/041d353b/ambari-metrics/ambari-metrics-host-monitoring/conf/unix/ambari-metrics-monitor ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-host-monitoring/conf/unix/ambari-metrics-monitor b/ambari-metrics/ambari-metrics-host-monitoring/conf/unix/ambari-metrics-monitor index 967e133..9bbb271 100644 --- a/ambari-metrics/ambari-metrics-host-monitoring/conf/unix/ambari-metrics-monitor +++ b/ambari-metrics/ambari-metrics-host-monitoring/conf/unix/ambari-metrics-monitor @@ -24,7 +24,7 @@ METRIC_MONITOR_PY_SCRIPT=${RESOURCE_MONITORING_DIR}/main.py PIDFILE=/var/run/ambari-metrics-monitor/ambari-metrics-monitor.pid OUTFILE=/var/log/ambari-metrics-monitor/ambari-metrics-monitor.out -STOP_TIMEOUT=5 +STOP_TIMEOUT=10 OK=0 NOTOK=1 http://git-wip-us.apache.org/repos/asf/ambari/blob/041d353b/ambari-metrics/ambari-metrics-host-monitoring/src/main/python/core/aggregator.py ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-host-monitoring/src/main/python/core/aggregator.py b/ambari-metrics/ambari-metrics-host-monitoring/src/main/python/core/aggregator.py new file mode 100644 index 0000000..2249e53 --- /dev/null +++ b/ambari-metrics/ambari-metrics-host-monitoring/src/main/python/core/aggregator.py @@ -0,0 +1,110 @@ +#!/usr/bin/env python + +''' +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +''' + +import threading +import subprocess +import logging +import urllib2 + +logger = logging.getLogger() +class Aggregator(threading.Thread): + def __init__(self, config, stop_handler): + threading.Thread.__init__(self) + self._config = config + self._stop_handler = stop_handler + self._aggregator_process = None + self._sleep_interval = config.get_collector_sleep_interval() + self.stopped = False + + def run(self): + java_home = self._config.get_java_home() + collector_hosts = self._config.get_metrics_collector_hosts_as_string() + jvm_agrs = self._config.get_aggregator_jvm_agrs() + config_dir = self._config.get_config_dir() + class_name = "org.apache.hadoop.metrics2.host.aggregator.AggregatorApplication" + ams_log_file = "ambari-metrics-aggregator.log" + additional_classpath = ':{0}'.format(config_dir) + ams_log_dir = self._config.ams_monitor_log_dir() + logger.info('Starting Aggregator thread.') + cmd = "{0}/bin/java {1} -Dams.log.dir={2} -Dams.log.file={3} -cp /var/lib/ambari-metrics-monitor/lib/*{4} {5} {6}"\ + .format(java_home, jvm_agrs, ams_log_dir, ams_log_file, additional_classpath, class_name, collector_hosts) + + logger.info("Executing : {0}".format(cmd)) + + self._aggregator_process = subprocess.Popen([cmd], stdout = None, stderr = None, shell = True) + while not self.stopped: + if 0 == self._stop_handler.wait(self._sleep_interval): + break + pass + self.stop() + + def stop(self): + self.stopped = True + if self._aggregator_process : + logger.info('Stopping Aggregator thread.') + self._aggregator_process.terminate() + +class AggregatorWatchdog(threading.Thread): + SLEEP_TIME = 30 + CONNECTION_TIMEOUT = 5 + AMS_AGGREGATOR_METRICS_CHECK_URL = "/ws/v1/timeline/metrics/" + def __init__(self, config, stop_handler): + threading.Thread.__init__(self) + self._config = config + self._stop_handler = stop_handler + self.URL = 'http://localhost:' + self._config.get_inmemory_aggregation_port() + self.AMS_AGGREGATOR_METRICS_CHECK_URL + self._is_ok = threading.Event() + self.set_is_ok(True) + self.stopped = False + + def run(self): + logger.info('Starting Aggregator Watchdog thread.') + while not self.stopped: + if 0 == self._stop_handler.wait(self.SLEEP_TIME): + break + try: + conn = urllib2.urlopen(self.URL, timeout=self.CONNECTION_TIMEOUT) + self.set_is_ok(True) + except (KeyboardInterrupt, SystemExit): + raise + except Exception, e: + self.set_is_ok(False) + continue + if conn.code != 200: + self.set_is_ok(False) + continue + conn.close() + + def is_ok(self): + return self._is_ok.is_set() + + def set_is_ok(self, value): + if value == False and self.is_ok() != value: + logger.warning("Watcher couldn't connect to aggregator.") + self._is_ok.clear() + else: + self._is_ok.set() + + + def stop(self): + logger.info('Stopping watcher thread.') + self.stopped = True + + http://git-wip-us.apache.org/repos/asf/ambari/blob/041d353b/ambari-metrics/ambari-metrics-host-monitoring/src/main/python/core/config_reader.py ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-host-monitoring/src/main/python/core/config_reader.py b/ambari-metrics/ambari-metrics-host-monitoring/src/main/python/core/config_reader.py index 2670e76..d1429ed 100644 --- a/ambari-metrics/ambari-metrics-host-monitoring/src/main/python/core/config_reader.py +++ b/ambari-metrics/ambari-metrics-host-monitoring/src/main/python/core/config_reader.py @@ -30,6 +30,8 @@ from ambari_commons.os_family_impl import OsFamilyImpl # Abstraction for OS-dependent configuration defaults # class ConfigDefaults(object): + def get_config_dir(self): + pass def get_config_file_path(self): pass def get_metric_file_path(self): @@ -40,11 +42,14 @@ class ConfigDefaults(object): @OsFamilyImpl(os_family=OSConst.WINSRV_FAMILY) class ConfigDefaultsWindows(ConfigDefaults): def __init__(self): + self._CONFIG_DIR = "conf" self._CONFIG_FILE_PATH = "conf\\metric_monitor.ini" self._METRIC_FILE_PATH = "conf\\metric_groups.conf" self._METRIC_FILE_PATH = "conf\\ca.pem" pass + def get_config_dir(self): + return self._CONFIG_DIR def get_config_file_path(self): return self._CONFIG_FILE_PATH def get_metric_file_path(self): @@ -55,11 +60,13 @@ class ConfigDefaultsWindows(ConfigDefaults): @OsFamilyImpl(os_family=OsFamilyImpl.DEFAULT) class ConfigDefaultsLinux(ConfigDefaults): def __init__(self): + self._CONFIG_DIR = "/etc/ambari-metrics-monitor/conf/" self._CONFIG_FILE_PATH = "/etc/ambari-metrics-monitor/conf/metric_monitor.ini" self._METRIC_FILE_PATH = "/etc/ambari-metrics-monitor/conf/metric_groups.conf" self._CA_CERTS_FILE_PATH = "/etc/ambari-metrics-monitor/conf/ca.pem" pass - + def get_config_dir(self): + return self._CONFIG_DIR def get_config_file_path(self): return self._CONFIG_FILE_PATH def get_metric_file_path(self): @@ -71,6 +78,7 @@ configDefaults = ConfigDefaults() config = ConfigParser.RawConfigParser() +CONFIG_DIR = configDefaults.get_config_dir() CONFIG_FILE_PATH = configDefaults.get_config_file_path() METRIC_FILE_PATH = configDefaults.get_metric_file_path() CA_CERTS_FILE_PATH = configDefaults.get_ca_certs_file_path() @@ -191,6 +199,8 @@ class Configuration: # No hostname script identified in the ambari agent conf pass pass + def get_config_dir(self): + return CONFIG_DIR def getConfig(self): return self.config @@ -214,10 +224,14 @@ class Configuration: def get_hostname_config(self): return self.get("default", "hostname", None) - def get_metrics_collector_hosts(self): + def get_metrics_collector_hosts_as_list(self): hosts = self.get("default", "metrics_servers", "localhost") return hosts.split(",") + def get_metrics_collector_hosts_as_string(self): + hosts = self.get("default", "metrics_servers", "localhost") + return hosts + def get_failover_strategy(self): return self.get("collector", "failover_strategy", ROUND_ROBIN_FAILOVER_STRATEGY) @@ -239,6 +253,23 @@ class Configuration: def is_server_https_enabled(self): return "true" == str(self.get("collector", "https_enabled")).lower() + def get_java_home(self): + return self.get("aggregation", "java_home") + + def is_inmemory_aggregation_enabled(self): + return "true" == str(self.get("aggregation", "host_in_memory_aggregation")).lower() + + def get_inmemory_aggregation_port(self): + return self.get("aggregation", "host_in_memory_aggregation_port") + + def get_aggregator_jvm_agrs(self): + hosts = self.get("aggregation", "jvm_arguments", "-Xmx256m -Xms128m -XX:PermSize=68m") + return hosts + + def ams_monitor_log_dir(self): + hosts = self.get("aggregation", "ams_monitor_log_dir", "/var/log/ambari-metrics-monitor") + return hosts + def is_set_instanceid(self): return "true" == str(self.get("default", "set.instanceId", 'false')).lower()
