Repository: ambari Updated Branches: refs/heads/trunk ef9106762 -> 53dba8513
AMBARI-15952 Add container metrics in AMS (jianhe via avijayan) Project: http://git-wip-us.apache.org/repos/asf/ambari/repo Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/53dba851 Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/53dba851 Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/53dba851 Branch: refs/heads/trunk Commit: 53dba85138f4c520086ba6e3a7f4f2fcfe32dfec Parents: ef91067 Author: Aravindan Vijayan <[email protected]> Authored: Mon Apr 18 18:01:30 2016 -0700 Committer: Aravindan Vijayan <[email protected]> Committed: Mon Apr 18 18:01:30 2016 -0700 ---------------------------------------------------------------------- .../timeline/AbstractTimelineMetricsSink.java | 25 ++- .../metrics2/sink/timeline/ContainerMetric.java | 218 +++++++++++++++++++ .../timeline/HadoopTimelineMetricsSink.java | 95 ++++++++ .../timeline/HBaseTimelineMetricStore.java | 8 + .../metrics/timeline/PhoenixHBaseAccessor.java | 80 ++++++- .../timeline/TimelineMetricConfiguration.java | 3 + .../metrics/timeline/TimelineMetricStore.java | 7 + .../timeline/query/PhoenixTransactSQL.java | 55 +++++ .../webapp/TimelineWebServices.java | 28 +++ .../timeline/ITPhoenixHBaseAccessor.java | 54 +++++ .../timeline/TestTimelineMetricStore.java | 7 + 11 files changed, 572 insertions(+), 8 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ambari/blob/53dba851/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/AbstractTimelineMetricsSink.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/AbstractTimelineMetricsSink.java b/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/AbstractTimelineMetricsSink.java index 28d3b9c..5a532c5 100644 --- a/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/AbstractTimelineMetricsSink.java +++ b/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/AbstractTimelineMetricsSink.java @@ -75,17 +75,15 @@ public abstract class AbstractTimelineMetricsSink { LOG = LogFactory.getLog(this.getClass()); } - protected void emitMetrics(TimelineMetrics metrics) { - String connectUrl = getCollectorUri(); + protected void emitMetricsJson(String connectUrl, String jsonData) { int timeout = getTimeoutSeconds() * 1000; HttpURLConnection connection = null; try { if (connectUrl == null) { throw new IOException("Unknown URL. Unable to connect to metrics collector."); } - String jsonData = mapper.writeValueAsString(metrics); connection = connectUrl.startsWith("https") ? - getSSLConnection(connectUrl) : getConnection(connectUrl); + getSSLConnection(connectUrl) : getConnection(connectUrl); connection.setRequestMethod("POST"); connection.setRequestProperty("Content-Type", "application/json"); @@ -104,7 +102,7 @@ public abstract class AbstractTimelineMetricsSink { if (statusCode != 200) { LOG.info("Unable to POST metrics to collector, " + connectUrl + ", " + - "statusCode = " + statusCode); + "statusCode = " + statusCode); } else { if (LOG.isDebugEnabled()) { LOG.debug("Metrics posted to Collector " + connectUrl); @@ -113,7 +111,7 @@ public abstract class AbstractTimelineMetricsSink { cleanupInputStream(connection.getInputStream()); } catch (IOException ioe) { StringBuilder errorMessage = - new StringBuilder("Unable to connect to collector, " + connectUrl + "\n"); + new StringBuilder("Unable to connect to collector, " + connectUrl + "\n"); try { if ((connection != null)) { errorMessage.append(cleanupInputStream(connection.getErrorStream())); @@ -130,6 +128,19 @@ public abstract class AbstractTimelineMetricsSink { } } + protected void emitMetrics(TimelineMetrics metrics) { + String connectUrl = getCollectorUri(); + String jsonData = null; + try { + jsonData = mapper.writeValueAsString(metrics); + } catch (IOException e) { + LOG.error("Unable to parse metrics", e); + } + if (jsonData != null) { + emitMetricsJson(connectUrl, jsonData); + } + } + /** * Cleans up and closes an input stream * see http://docs.oracle.com/javase/6/docs/technotes/guides/net/http-keepalive.html @@ -137,7 +148,7 @@ public abstract class AbstractTimelineMetricsSink { * @return string read from the InputStream * @throws IOException */ - private String cleanupInputStream(InputStream is) throws IOException { + protected String cleanupInputStream(InputStream is) throws IOException { StringBuilder sb = new StringBuilder(); if (is != null) { try ( http://git-wip-us.apache.org/repos/asf/ambari/blob/53dba851/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/ContainerMetric.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/ContainerMetric.java b/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/ContainerMetric.java new file mode 100644 index 0000000..0e2051b --- /dev/null +++ b/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/ContainerMetric.java @@ -0,0 +1,218 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.metrics2.sink.timeline; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; + +import javax.xml.bind.annotation.XmlAccessType; +import javax.xml.bind.annotation.XmlAccessorType; +import javax.xml.bind.annotation.XmlRootElement; + +/** + * This differs from TimelineMetric in that this class contains all the fields + * for a single metric. + */ +@XmlRootElement(name = "containermetric") +@XmlAccessorType(XmlAccessType.FIELD) [email protected] [email protected] +public class ContainerMetric { + private String hostName; + private String containerId; + private int pmemLimit; + private int vmemLimit; + private int pmemUsedAvg; + private int pmemUsedMin; + private int pmemUsedMax; + private int pmem50Pct; + private int pmem75Pct; + private int pmem90Pct; + private int pmem95Pct; + private int pmem99Pct; + private long launchDuration; + private long localizationDuration; + private long startTime; + private long finishTime; + private int exitCode; + + + public ContainerMetric() { + + } + + public String getHostName() { + return hostName; + } + + public void setHostName(String hostName) { + this.hostName = hostName; + } + + public String getContainerId() { + return containerId; + } + + public void setContainerId(String containerId) { + this.containerId = containerId; + } + + public int getPmemLimit() { + return pmemLimit; + } + + public void setPmemLimit(int pmemLimit) { + this.pmemLimit = pmemLimit; + } + + public int getVmemLimit() { + return vmemLimit; + } + + public void setVmemLimit(int vmemLimit) { + this.vmemLimit = vmemLimit; + } + + public int getPmemUsedAvg() { + return pmemUsedAvg; + } + + public void setPmemUsedAvg(int pmemUsedAvg) { + this.pmemUsedAvg = pmemUsedAvg; + } + + public int getPmemUsedMin() { + return pmemUsedMin; + } + + public void setPmemUsedMin(int pmemUsedMin) { + this.pmemUsedMin = pmemUsedMin; + } + + public int getPmemUsedMax() { + return pmemUsedMax; + } + + public void setPmemUsedMax(int pmemUsedMax) { + this.pmemUsedMax = pmemUsedMax; + } + + public int getPmem50Pct() { + return pmem50Pct; + } + + public void setPmem50Pct(int pmem50Pct) { + this.pmem50Pct = pmem50Pct; + } + + public int getPmem75Pct() { + return pmem75Pct; + } + + public void setPmem75Pct(int pmem75Pct) { + this.pmem75Pct = pmem75Pct; + } + + public int getPmem90Pct() { + return pmem90Pct; + } + + public void setPmem90Pct(int pmem90Pct) { + this.pmem90Pct = pmem90Pct; + } + + public int getPmem95Pct() { + return pmem95Pct; + } + + public void setPmem95Pct(int pmem95Pct) { + this.pmem95Pct = pmem95Pct; + } + + public int getPmem99Pct() { + return pmem99Pct; + } + + public void setPmem99Pct(int pmem99Pct) { + this.pmem99Pct = pmem99Pct; + } + + public long getLaunchDuration() { + return launchDuration; + } + + public void setLaunchDuration(long launchDuration) { + this.launchDuration = launchDuration; + } + + public long getLocalizationDuration() { + return localizationDuration; + } + + public void setLocalizationDuration(long localizationDuration) { + this.localizationDuration = localizationDuration; + } + + public long getStartTime() { + return startTime; + } + + public void setStartTime(long startTime) { + this.startTime = startTime; + } + + public long getFinishTime() { + return finishTime; + } + + public void setFinishTime(long finishTime) { + this.finishTime = finishTime; + } + + public int getExitCode() { + return exitCode; + } + + public void setExitCode(int exitCode) { + this.exitCode = exitCode; + } + + @Override + public String toString() { + return "ContainerMetric{" + + "hostName='" + hostName + '\'' + + ", containerId='" + containerId + '\'' + + ", pmemLimit=" + pmemLimit + + ", vmemLimit=" + vmemLimit + + ", pmemUsedAvg=" + pmemUsedAvg + + ", pmemUsedMin=" + pmemUsedMin + + ", pmemUsedMax=" + pmemUsedMax + + ", pmem50Pct=" + pmem50Pct + + ", pmem75Pct=" + pmem75Pct + + ", pmem90Pct=" + pmem90Pct + + ", pmem95Pct=" + pmem95Pct + + ", pmem99Pct=" + pmem99Pct + + ", launchDuration=" + launchDuration + + ", localizationDuration=" + localizationDuration + + ", startTime=" + startTime + + ", finishTime=" + finishTime + + ", exitCode=" + exitCode + + '}'; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ambari/blob/53dba851/ambari-metrics/ambari-metrics-hadoop-sink/src/main/java/org/apache/hadoop/metrics2/sink/timeline/HadoopTimelineMetricsSink.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-hadoop-sink/src/main/java/org/apache/hadoop/metrics2/sink/timeline/HadoopTimelineMetricsSink.java b/ambari-metrics/ambari-metrics-hadoop-sink/src/main/java/org/apache/hadoop/metrics2/sink/timeline/HadoopTimelineMetricsSink.java index db8791f..35b9459 100644 --- a/ambari-metrics/ambari-metrics-hadoop-sink/src/main/java/org/apache/hadoop/metrics2/sink/timeline/HadoopTimelineMetricsSink.java +++ b/ambari-metrics/ambari-metrics-hadoop-sink/src/main/java/org/apache/hadoop/metrics2/sink/timeline/HadoopTimelineMetricsSink.java @@ -31,6 +31,9 @@ import org.apache.hadoop.metrics2.sink.timeline.cache.TimelineMetricsCache; import org.apache.hadoop.metrics2.util.Servers; import org.apache.hadoop.net.DNS; +import java.io.IOException; +import java.io.OutputStream; +import java.net.HttpURLConnection; import java.net.SocketAddress; import java.net.UnknownHostException; import java.util.ArrayList; @@ -51,6 +54,9 @@ public class HadoopTimelineMetricsSink extends AbstractTimelineMetricsSink imple private String serviceName = ""; private List<? extends SocketAddress> metricsServers; private String collectorUri; + private String containerMetricsUri; + public static final String WS_V1_CONTAINER_METRICS = "/ws/v1/timeline/containermetrics"; + private static final String SERVICE_NAME_PREFIX = "serviceName-prefix"; private static final String SERVICE_NAME = "serviceName"; private int timeoutSeconds = 10; @@ -88,6 +94,7 @@ public class HadoopTimelineMetricsSink extends AbstractTimelineMetricsSink imple LOG.error("No Metric collector configured."); } else { collectorUri = conf.getString(COLLECTOR_PROPERTY).trim() + WS_V1_TIMELINE_METRICS; + containerMetricsUri = conf.getString(COLLECTOR_PROPERTY).trim() + WS_V1_CONTAINER_METRICS; if (collectorUri.toLowerCase().startsWith("https://")) { String trustStorePath = conf.getString(SSL_KEYSTORE_PATH_PROPERTY).trim(); String trustStoreType = conf.getString(SSL_KEYSTORE_TYPE_PROPERTY).trim(); @@ -220,6 +227,11 @@ public class HadoopTimelineMetricsSink extends AbstractTimelineMetricsSink imple } } + if (record.context().equals("container")) { + emitContainerMetrics(record); + return; + } + int sbBaseLen = sb.length(); Collection<AbstractMetric> metrics = (Collection<AbstractMetric>) record.metrics(); @@ -263,6 +275,89 @@ public class HadoopTimelineMetricsSink extends AbstractTimelineMetricsSink imple } } + private void parseContainerMetrics(MetricsRecord record, + ContainerMetric containerMetric) { + for (AbstractMetric metric : record.metrics() ) { + switch (metric.name()) { + case "PMemUsageMBsAvgMBs": + containerMetric.setPmemUsedAvg(metric.value().intValue()); + break; + case "PMemUsageMBsMinMBs": + containerMetric.setPmemUsedMin(metric.value().intValue()); + break; + case "PMemUsageMBsMaxMBs": + containerMetric.setPmemUsedMax(metric.value().intValue()); + break; + case "PMemUsageMBHistogram50thPercentileMBs": + containerMetric.setPmem50Pct(metric.value().intValue()); + break; + case "PMemUsageMBHistogram75thPercentileMBs": + containerMetric.setPmem75Pct(metric.value().intValue()); + break; + case "PMemUsageMBHistogram90thPercentileMBs": + containerMetric.setPmem90Pct(metric.value().intValue()); + break; + case "PMemUsageMBHistogram95thPercentileMBs": + containerMetric.setPmem95Pct(metric.value().intValue()); + break; + case "PMemUsageMBHistogram99thPercentileMBs": + containerMetric.setPmem99Pct(metric.value().intValue()); + break; + case "pMemLimitMBs": + containerMetric.setPmemLimit(metric.value().intValue()); + break; + case "vMemLimitMBs": + containerMetric.setVmemLimit(metric.value().intValue()); + break; + case "launchDurationMs": + containerMetric.setLaunchDuration(metric.value().longValue()); + break; + case "localizationDurationMs": + containerMetric.setLocalizationDuration(metric.value().longValue()); + break; + case "StartTime": + containerMetric.setStartTime(metric.value().longValue()); + break; + case "FinishTime": + containerMetric.setFinishTime(metric.value().longValue()); + break; + case "ExitCode": + containerMetric.setExitCode((metric.value().intValue())); + break; + default: + break; + } + } + if (LOG.isDebugEnabled()) { + LOG.debug(containerMetric); + } + } + + private void emitContainerMetrics(MetricsRecord record) { + + ContainerMetric containerMetric = new ContainerMetric(); + containerMetric.setHostName(hostName); + + for (MetricsTag tag : record.tags()) { + if (tag.name().equals("ContainerResource")) { + containerMetric.setContainerId(tag.value()); + } + } + + parseContainerMetrics(record, containerMetric); + List<ContainerMetric> list = new ArrayList<>(); + list.add(containerMetric); + String jsonData = null; + try { + jsonData = mapper.writeValueAsString(list); + } catch (IOException e) { + LOG.error("Unable to parse container metrics ", e); + } + if (jsonData != null) { + emitMetricsJson(containerMetricsUri, jsonData); + } + } + // Taken as is from Ganglia30 implementation @InterfaceAudience.Private public void appendPrefix(MetricsRecord record, StringBuilder sb) { http://git-wip-us.apache.org/repos/asf/ambari/blob/53dba851/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/HBaseTimelineMetricStore.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/HBaseTimelineMetricStore.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/HBaseTimelineMetricStore.java index ab11333..89c67d1 100644 --- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/HBaseTimelineMetricStore.java +++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/HBaseTimelineMetricStore.java @@ -21,6 +21,7 @@ package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.metrics2.sink.timeline.ContainerMetric; import org.apache.hadoop.metrics2.sink.timeline.Precision; import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric; import org.apache.hadoop.metrics2.sink.timeline.TimelineMetricMetadata; @@ -336,6 +337,13 @@ public class HBaseTimelineMetricStore extends AbstractService implements Timelin } @Override + public TimelinePutResponse putContainerMetrics(List<ContainerMetric> metrics) + throws SQLException, IOException { + hBaseAccessor.insertContainerMetrics(metrics); + return new TimelinePutResponse(); + } + + @Override public Map<String, List<TimelineMetricMetadata>> getTimelineMetricMetadata() throws SQLException, IOException { Map<TimelineMetricMetadataKey, TimelineMetricMetadata> metadata = metricMetadataManager.getMetadataCache(); http://git-wip-us.apache.org/repos/asf/ambari/blob/53dba851/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixHBaseAccessor.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixHBaseAccessor.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixHBaseAccessor.java index e434d33..6a76bf1 100644 --- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixHBaseAccessor.java +++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixHBaseAccessor.java @@ -28,11 +28,13 @@ import org.apache.hadoop.hbase.client.Durability; import org.apache.hadoop.hbase.client.HBaseAdmin; import org.apache.hadoop.hbase.util.RetryCounter; import org.apache.hadoop.hbase.util.RetryCounterFactory; +import org.apache.hadoop.metrics2.sink.timeline.ContainerMetric; import org.apache.hadoop.metrics2.sink.timeline.Precision; import org.apache.hadoop.metrics2.sink.timeline.SingleValuedTimelineMetric; import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric; import org.apache.hadoop.metrics2.sink.timeline.TimelineMetricMetadata; import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics; +import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.AggregatorUtils; import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.Function; import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.MetricClusterAggregate; @@ -57,6 +59,7 @@ import java.sql.PreparedStatement; import java.sql.ResultSet; import java.sql.SQLException; import java.sql.Statement; +import java.sql.Timestamp; import java.util.Arrays; import java.util.Collection; import java.util.HashMap; @@ -86,6 +89,9 @@ import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.ti import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.OUT_OFF_BAND_DATA_TIME_ALLOWANCE; import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.PRECISION_TABLE_SPLIT_POINTS; import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.PRECISION_TABLE_TTL; +import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.CONTAINER_METRICS_TTL; +import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.CONTAINER_METRICS_TABLE_NAME; +import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.CREATE_CONTAINER_METRICS_TABLE_SQL; import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.CREATE_HOSTED_APPS_METADATA_TABLE_SQL; import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.CREATE_METRICS_AGGREGATE_TABLE_SQL; import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.CREATE_METRICS_CLUSTER_AGGREGATE_GROUPED_TABLE_SQL; @@ -111,6 +117,8 @@ import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.ti import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.UPSERT_HOSTED_APPS_METADATA_SQL; import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.UPSERT_METADATA_SQL; import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.UPSERT_METRICS_SQL; +import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.UPSERT_CONTAINER_METRICS_SQL; + /** * Provides a facade over the Phoenix API to access HBase schema @@ -180,6 +188,7 @@ public class PhoenixHBaseAccessor { this.timelineMetricsTablesDurability = metricsConf.get(TIMELINE_METRICS_TABLES_DURABILITY, ""); tableTTL.put(METRICS_RECORD_TABLE_NAME, metricsConf.get(PRECISION_TABLE_TTL, String.valueOf(1 * 86400))); // 1 day + tableTTL.put(CONTAINER_METRICS_TABLE_NAME, metricsConf.get(CONTAINER_METRICS_TTL, String.valueOf(30 * 86400))); // 30 days tableTTL.put(METRICS_AGGREGATE_MINUTE_TABLE_NAME, metricsConf.get(HOST_MINUTE_TABLE_TTL, String.valueOf(7 * 86400))); //7 days tableTTL.put(METRICS_AGGREGATE_HOURLY_TABLE_NAME, metricsConf.get(HOST_HOUR_TABLE_TTL, String.valueOf(30 * 86400))); //30 days tableTTL.put(METRICS_AGGREGATE_DAILY_TABLE_NAME, metricsConf.get(HOST_DAILY_TABLE_TTL, String.valueOf(365 * 86400))); //1 year @@ -271,6 +280,10 @@ public class PhoenixHBaseAccessor { encoding, compression); stmt.executeUpdate(hostedAppSql); + // Container Metrics + stmt.executeUpdate( String.format(CREATE_CONTAINER_METRICS_TABLE_SQL, + encoding, tableTTL.get(CONTAINER_METRICS_TABLE_NAME), compression)); + // Host level String precisionSql = String.format(CREATE_METRICS_TABLE_SQL, encoding, tableTTL.get(METRICS_RECORD_TABLE_NAME), compression); @@ -462,6 +475,71 @@ public class PhoenixHBaseAccessor { return ""; } + public void insertContainerMetrics(List<ContainerMetric> metrics) + throws SQLException, IOException { + Connection conn = getConnection(); + PreparedStatement metricRecordStmt = null; + + try { + metricRecordStmt = conn.prepareStatement( + String.format(UPSERT_CONTAINER_METRICS_SQL, CONTAINER_METRICS_TABLE_NAME)); + for (ContainerMetric metric : metrics) { + metricRecordStmt.clearParameters(); + metricRecordStmt.setString(1, ContainerId.fromString(metric.getContainerId()) + .getApplicationAttemptId().getApplicationId().toString()); + metricRecordStmt.setString(2, metric.getContainerId()); + metricRecordStmt.setTimestamp(3, new Timestamp(metric.getStartTime())); + metricRecordStmt.setTimestamp(4, new Timestamp(metric.getFinishTime())); + metricRecordStmt.setLong(5, metric.getFinishTime() - metric.getStartTime()); + metricRecordStmt.setString(6, metric.getHostName()); + metricRecordStmt.setInt(7, metric.getExitCode()); + metricRecordStmt.setLong(8, metric.getLocalizationDuration()); + metricRecordStmt.setLong(9, metric.getLaunchDuration()); + metricRecordStmt.setDouble(10, (double) metric.getPmemLimit() / 1024); + metricRecordStmt.setDouble(11, + ((double) metric.getPmemLimit() / 1024) * (metric.getFinishTime() + - metric.getStartTime())); + metricRecordStmt.setDouble(12, (double) metric.getVmemLimit() / 1024); + metricRecordStmt.setDouble(13, (double) metric.getPmemUsedMin() / 1024); + metricRecordStmt.setDouble(14, (double) metric.getPmemUsedMax() / 1024); + metricRecordStmt.setDouble(15, (double) metric.getPmemUsedAvg() / 1024); + metricRecordStmt.setDouble(16, (double) metric.getPmem50Pct() / 1024); + metricRecordStmt.setDouble(17, (double) metric.getPmem75Pct() / 1024); + metricRecordStmt.setDouble(18, (double) metric.getPmem90Pct() / 1024); + metricRecordStmt.setDouble(19, (double) metric.getPmem95Pct()/ 1024); + metricRecordStmt.setDouble(20, (double) metric.getPmem99Pct() / 1024); + metricRecordStmt.setDouble(21, (double) metric.getPmemLimit() / 1024 + - (double) metric.getPmemUsedMax() / 1024); + metricRecordStmt.setDouble(22, ((double) metric.getPmemLimit() / 1024 + - (double) metric.getPmemUsedMax() / 1024) * (metric.getFinishTime() + - metric.getStartTime())); + + try { + metricRecordStmt.executeUpdate(); + } catch (SQLException sql) { + LOG.error("Failed on insert records to store.", sql); + } + } + + conn.commit(); + } finally { + if (metricRecordStmt != null) { + try { + metricRecordStmt.close(); + } catch (SQLException e) { + // Ignore + } + } + if (conn != null) { + try { + conn.close(); + } catch (SQLException sql) { + // Ignore + } + } + } + } + public void insertMetricRecordsWithMetadata(TimelineMetricMetadataManager metadataManager, TimelineMetrics metrics) throws SQLException, IOException { List<TimelineMetric> timelineMetrics = metrics.getMetrics(); @@ -860,7 +938,7 @@ public class PhoenixHBaseAccessor { } if (stmt != null) { stmt.close(); - } + } } } } http://git-wip-us.apache.org/repos/asf/ambari/blob/53dba851/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricConfiguration.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricConfiguration.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricConfiguration.java index c0093a0..b15553f 100644 --- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricConfiguration.java +++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricConfiguration.java @@ -54,6 +54,9 @@ public class TimelineMetricConfiguration { public static final String HBASE_COMPRESSION_SCHEME = "timeline.metrics.hbase.compression.scheme"; + public static final String CONTAINER_METRICS_TTL = + "timeline.container-metrics.ttl"; + public static final String PRECISION_TABLE_TTL = "timeline.metrics.host.aggregator.ttl"; http://git-wip-us.apache.org/repos/asf/ambari/blob/53dba851/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricStore.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricStore.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricStore.java index 2f08f3f..ded64e3 100644 --- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricStore.java +++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricStore.java @@ -17,6 +17,7 @@ */ package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline; +import org.apache.hadoop.metrics2.sink.timeline.ContainerMetric; import org.apache.hadoop.metrics2.sink.timeline.Precision; import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric; import org.apache.hadoop.metrics2.sink.timeline.TimelineMetricMetadata; @@ -73,6 +74,12 @@ public interface TimelineMetricStore { TimelinePutResponse putMetrics(TimelineMetrics metrics) throws SQLException, IOException; /** + * Store container metric into the timeliens tore + */ + TimelinePutResponse putContainerMetrics(List<ContainerMetric> metrics) + throws SQLException, IOException; + + /** * Return all metrics metadata that have been written to the store. * @return { appId : [ @TimelineMetricMetadata ] } * @throws SQLException http://git-wip-us.apache.org/repos/asf/ambari/blob/53dba851/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/query/PhoenixTransactSQL.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/query/PhoenixTransactSQL.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/query/PhoenixTransactSQL.java index 0efa68f..6ee0006 100644 --- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/query/PhoenixTransactSQL.java +++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/query/PhoenixTransactSQL.java @@ -59,6 +59,33 @@ public class PhoenixTransactSQL { "INSTANCE_ID)) DATA_BLOCK_ENCODING='%s', IMMUTABLE_ROWS=true, " + "TTL=%s, COMPRESSION='%s'"; + public static final String CREATE_CONTAINER_METRICS_TABLE_SQL = + "CREATE TABLE IF NOT EXISTS CONTAINER_METRICS " + + "(APP_ID VARCHAR, " + + " CONTAINER_ID VARCHAR," + + " START_TIME TIMESTAMP," + + " FINISH_TIME TIMESTAMP, " + + " DURATION BIGINT," + + " HOSTNAME VARCHAR," + + " EXIT_CODE INTEGER," + + " LOCALIZATION_DURATION BIGINT," + + " LAUNCH_DURATION BIGINT," + + " MEM_REQUESTED_GB DOUBLE," + + " MEM_REQUESTED_GB_MILLIS DOUBLE," + + " MEM_VIRTUAL_GB DOUBLE," + + " MEM_USED_GB_MIN DOUBLE," + + " MEM_USED_GB_MAX DOUBLE," + + " MEM_USED_GB_AVG DOUBLE," + + " MEM_USED_GB_50_PCT DOUBLE," + + " MEM_USED_GB_75_PCT DOUBLE," + + " MEM_USED_GB_90_PCT DOUBLE," + + " MEM_USED_GB_95_PCT DOUBLE," + + " MEM_USED_GB_99_PCT DOUBLE," + + " MEM_UNUSED_GB DOUBLE," + + " MEM_UNUSED_GB_MILLIS DOUBLE " + + " CONSTRAINT pk PRIMARY KEY(APP_ID, CONTAINER_ID)) DATA_BLOCK_ENCODING='%s'," + + " IMMUTABLE_ROWS=true, TTL=%s, COMPRESSION='%s'"; + public static final String CREATE_METRICS_AGGREGATE_TABLE_SQL = "CREATE TABLE IF NOT EXISTS %s " + "(METRIC_NAME VARCHAR, " + @@ -141,6 +168,31 @@ public class PhoenixTransactSQL { "METRICS) VALUES " + "(?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)"; + public static final String UPSERT_CONTAINER_METRICS_SQL = "UPSERT INTO %s " + + "(APP_ID," + + " CONTAINER_ID," + + " START_TIME," + + " FINISH_TIME," + + " DURATION," + + " HOSTNAME," + + " EXIT_CODE," + + " LOCALIZATION_DURATION," + + " LAUNCH_DURATION," + + " MEM_REQUESTED_GB," + + " MEM_REQUESTED_GB_MILLIS," + + " MEM_VIRTUAL_GB," + + " MEM_USED_GB_MIN," + + " MEM_USED_GB_MAX," + + " MEM_USED_GB_AVG," + + " MEM_USED_GB_50_PCT," + + " MEM_USED_GB_75_PCT," + + " MEM_USED_GB_90_PCT," + + " MEM_USED_GB_95_PCT," + + " MEM_USED_GB_99_PCT," + + " MEM_UNUSED_GB," + + " MEM_UNUSED_GB_MILLIS) VALUES " + + "(?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)"; + public static final String UPSERT_CLUSTER_AGGREGATE_SQL = "UPSERT INTO " + "%s (METRIC_NAME, APP_ID, INSTANCE_ID, SERVER_TIME, " + "UNITS, " + @@ -277,6 +329,9 @@ public class PhoenixTransactSQL { "SERVER_TIME <= %s GROUP BY METRIC_NAME, APP_ID, INSTANCE_ID, UNITS"; public static final String METRICS_RECORD_TABLE_NAME = "METRIC_RECORD"; + + public static final String CONTAINER_METRICS_TABLE_NAME = "CONTAINER_METRICS"; + public static final String METRICS_AGGREGATE_MINUTE_TABLE_NAME = "METRIC_RECORD_MINUTE"; public static final String METRICS_AGGREGATE_HOURLY_TABLE_NAME = http://git-wip-us.apache.org/repos/asf/ambari/blob/53dba851/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/webapp/TimelineWebServices.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/webapp/TimelineWebServices.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/webapp/TimelineWebServices.java index d2526a0..4cfc415 100644 --- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/webapp/TimelineWebServices.java +++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/webapp/TimelineWebServices.java @@ -24,6 +24,8 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience.Public; import org.apache.hadoop.classification.InterfaceStability.Unstable; +import org.apache.hadoop.metrics2.annotation.Metric; +import org.apache.hadoop.metrics2.sink.timeline.ContainerMetric; import org.apache.hadoop.metrics2.sink.timeline.PrecisionLimitExceededException; import org.apache.hadoop.metrics2.sink.timeline.TimelineMetricMetadata; import org.apache.hadoop.yarn.api.records.timeline.TimelineEntities; @@ -282,6 +284,32 @@ public class TimelineWebServices { } } + @Path("/containermetrics") + @POST + @Consumes({ MediaType.APPLICATION_JSON /* , MediaType.APPLICATION_XML */}) + public TimelinePutResponse postContainerMetrics( + @Context HttpServletRequest req, + @Context HttpServletResponse res, + List<ContainerMetric> metrics) { + init(res); + if (metrics == null || metrics.isEmpty()) { + return new TimelinePutResponse(); + } + + try { + if (LOG.isDebugEnabled()) { + LOG.debug("Storing container metrics: " + TimelineUtils + .dumpTimelineRecordtoJSON(metrics, true)); + } + + return timelineMetricStore.putContainerMetrics(metrics); + + } catch (Exception e) { + LOG.error("Error saving metrics.", e); + throw new WebApplicationException(e, Response.Status.INTERNAL_SERVER_ERROR); + } + } + /** * Query for a particular metric satisfying the filter criteria. * @return {@link TimelineMetric} http://git-wip-us.apache.org/repos/asf/ambari/blob/53dba851/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/ITPhoenixHBaseAccessor.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/ITPhoenixHBaseAccessor.java b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/ITPhoenixHBaseAccessor.java index f61d619..e98f19e 100644 --- a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/ITPhoenixHBaseAccessor.java +++ b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/ITPhoenixHBaseAccessor.java @@ -23,6 +23,7 @@ import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.client.Durability; import org.apache.hadoop.hbase.client.HBaseAdmin; +import org.apache.hadoop.metrics2.sink.timeline.ContainerMetric; import org.apache.hadoop.metrics2.sink.timeline.Precision; import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric; import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics; @@ -38,8 +39,15 @@ import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline. import org.junit.Test; import java.io.IOException; +import java.lang.reflect.Array; import java.lang.reflect.Field; import java.sql.SQLException; +import java.sql.Connection; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Statement; +import java.util.Arrays; import java.util.Collections; import java.util.HashMap; import java.util.Iterator; @@ -375,4 +383,50 @@ public class ITPhoenixHBaseAccessor extends AbstractMiniHBaseClusterTest { private Map<String, List<Function>> singletonValueFunctionMap(String metricName) { return Collections.singletonMap(metricName, Collections.singletonList(new Function())); } + + @Test + public void testInsertContainerMetrics() throws Exception { + ContainerMetric metric = new ContainerMetric(); + metric.setContainerId("container_1450744875949_0001_01_000001"); + metric.setHostName("host1"); + metric.setPmemLimit(2048); + metric.setVmemLimit(2048); + metric.setPmemUsedAvg(1024); + metric.setPmemUsedMin(1024); + metric.setPmemUsedMax(1024); + metric.setLaunchDuration(2000); + metric.setLocalizationDuration(3000); + long startTime = System.currentTimeMillis(); + long finishTime = startTime + 5000; + metric.setStartTime(startTime); + metric.setFinishTime(finishTime); + metric.setExitCode(0); + List<ContainerMetric> list = Arrays.asList(metric); + hdb.insertContainerMetrics(list); + PreparedStatement stmt = conn.prepareStatement("SELECT * FROM CONTAINER_METRICS"); + ResultSet set = stmt.executeQuery(); + // check each filed is set properly when read back. + boolean foundRecord = false; + while (set.next()) { + assertEquals("application_1450744875949_0001", set.getString("APP_ID")); + assertEquals("container_1450744875949_0001_01_000001", set.getString("CONTAINER_ID")); + assertEquals(new java.sql.Timestamp(startTime), set.getTimestamp("START_TIME")); + assertEquals(new java.sql.Timestamp(finishTime), set.getTimestamp("FINISH_TIME")); + assertEquals(5000, set.getLong("DURATION")); + assertEquals("host1", set.getString("HOSTNAME")); + assertEquals(0, set.getInt("EXIT_CODE")); + assertEquals(3000, set.getLong("LOCALIZATION_DURATION")); + assertEquals(2000, set.getLong("LAUNCH_DURATION")); + assertEquals((double)2, set.getDouble("MEM_REQUESTED_GB")); + assertEquals((double)2 * 5000, set.getDouble("MEM_REQUESTED_GB_MILLIS")); + assertEquals((double)2, set.getDouble("MEM_VIRTUAL_GB")); + assertEquals((double)1, set.getDouble("MEM_USED_GB_MIN")); + assertEquals((double)1, set.getDouble("MEM_USED_GB_MAX")); + assertEquals((double)1, set.getDouble("MEM_USED_GB_AVG")); + assertEquals((double)(2 - 1), set.getDouble("MEM_UNUSED_GB")); + assertEquals((double)(2-1) * 5000, set.getDouble("MEM_UNUSED_GB_MILLIS")); + foundRecord = true; + } + assertTrue(foundRecord); + } } http://git-wip-us.apache.org/repos/asf/ambari/blob/53dba851/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TestTimelineMetricStore.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TestTimelineMetricStore.java b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TestTimelineMetricStore.java index 61d6e71..16bbf0e 100644 --- a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TestTimelineMetricStore.java +++ b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TestTimelineMetricStore.java @@ -17,6 +17,7 @@ */ package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline; +import org.apache.hadoop.metrics2.sink.timeline.ContainerMetric; import org.apache.hadoop.metrics2.sink.timeline.Precision; import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric; import org.apache.hadoop.metrics2.sink.timeline.TimelineMetricMetadata; @@ -86,6 +87,12 @@ public class TestTimelineMetricStore implements TimelineMetricStore { } @Override + public TimelinePutResponse putContainerMetrics(List<ContainerMetric> metrics) + throws SQLException, IOException { + return new TimelinePutResponse(); + } + + @Override public Map<String, List<TimelineMetricMetadata>> getTimelineMetricMetadata() throws SQLException, IOException { return null; }
