AMBARI-21214 : Use a uuid vs long row key for metrics in AMS schema. (avijayan)
Project: http://git-wip-us.apache.org/repos/asf/ambari/repo Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/041e4e9a Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/041e4e9a Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/041e4e9a Branch: refs/heads/branch-3.0-ams Commit: 041e4e9a1a3e6aa36b2bbf3f1b0e86b8d70fc6b8 Parents: 82e6229 Author: Aravindan Vijayan <[email protected]> Authored: Mon Jun 19 10:55:44 2017 -0700 Committer: Aravindan Vijayan <[email protected]> Committed: Mon Jun 19 10:55:44 2017 -0700 ---------------------------------------------------------------------- .../timeline/SingleValuedTimelineMetric.java | 9 +- .../metrics2/sink/timeline/TimelineMetric.java | 8 + .../sink/timeline/TimelineMetricMetadata.java | 37 +- .../timeline/HBaseTimelineMetricsService.java | 43 +- .../metrics/timeline/PhoenixHBaseAccessor.java | 178 +- .../timeline/TimelineMetricConfiguration.java | 3 + .../metrics/timeline/TimelineMetricStore.java | 12 +- .../metrics/timeline/TimelineMetricsFilter.java | 7 - .../aggregators/AbstractTimelineAggregator.java | 45 +- .../aggregators/TimelineClusterMetric.java | 6 +- .../TimelineMetricAggregatorFactory.java | 12 + .../TimelineMetricAppAggregator.java | 28 +- .../TimelineMetricClusterAggregator.java | 9 +- .../TimelineMetricClusterAggregatorSecond.java | 24 +- .../TimelineMetricHostAggregator.java | 10 +- .../aggregators/TimelineMetricReadHelper.java | 61 +- .../discovery/TimelineMetricHostMetadata.java | 51 + .../discovery/TimelineMetricMetadataKey.java | 26 +- .../TimelineMetricMetadataManager.java | 290 +++- .../discovery/TimelineMetricMetadataSync.java | 18 +- .../metrics/timeline/query/Condition.java | 1 + .../timeline/query/ConditionBuilder.java | 10 +- .../timeline/query/DefaultCondition.java | 60 +- .../metrics/timeline/query/EmptyCondition.java | 5 + .../timeline/query/PhoenixTransactSQL.java | 277 +-- .../query/SplitByMetricNamesCondition.java | 40 +- .../metrics/timeline/query/TopNCondition.java | 63 +- .../timeline/uuid/HashBasedUuidGenStrategy.java | 202 +++ .../timeline/uuid/MetricUuidGenStrategy.java | 49 + .../timeline/uuid/RandomUuidGenStrategy.java | 53 + .../webapp/TimelineWebServices.java | 17 + .../resources/metrics_def/AMBARI_SERVER.dat | 40 + .../resources/metrics_def/JOBHISTORYSERVER.dat | 58 + .../main/resources/metrics_def/MASTER_HBASE.dat | 230 ++- .../main/resources/metrics_def/SLAVE_HBASE.dat | 700 ++++++-- .../timeline/ITPhoenixHBaseAccessor.java | 6 +- .../metrics/timeline/MetricTestHelper.java | 2 +- .../timeline/PhoenixHBaseAccessorTest.java | 10 +- .../timeline/TestPhoenixTransactSQL.java | 105 +- .../timeline/TestTimelineMetricStore.java | 10 + .../TimelineMetricsAggregatorMemorySink.java | 4 +- .../timeline/aggregators/DownSamplerTest.java | 2 + .../aggregators/ITClusterAggregator.java | 15 +- .../aggregators/ITMetricAggregator.java | 8 +- ...melineMetricClusterAggregatorSecondTest.java | 65 +- .../timeline/discovery/TestMetadataManager.java | 173 +- .../timeline/discovery/TestMetadataSync.java | 32 +- .../uuid/TimelineMetricUuidManagerTest.java | 184 ++ .../test/resources/test_data/full_whitelist.dat | 1615 ++++++++++++++++++ 49 files changed, 4051 insertions(+), 862 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ambari/blob/041e4e9a/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/SingleValuedTimelineMetric.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/SingleValuedTimelineMetric.java b/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/SingleValuedTimelineMetric.java index 8ecca54..4bb9355 100644 --- a/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/SingleValuedTimelineMetric.java +++ b/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/SingleValuedTimelineMetric.java @@ -30,7 +30,6 @@ public class SingleValuedTimelineMetric { private String instanceId; private String hostName; private Long startTime; - private String type; public void setSingleTimeseriesValue(Long timestamp, Double value) { this.timestamp = timestamp; @@ -39,14 +38,13 @@ public class SingleValuedTimelineMetric { public SingleValuedTimelineMetric(String metricName, String appId, String instanceId, String hostName, - long timestamp, long startTime, String type) { + long timestamp, long startTime) { this.metricName = metricName; this.appId = appId; this.instanceId = instanceId; this.hostName = hostName; this.timestamp = timestamp; this.startTime = startTime; - this.type = type; } public Long getTimestamp() { @@ -57,10 +55,6 @@ public class SingleValuedTimelineMetric { return startTime; } - public String getType() { - return type; - } - public Double getValue() { return value; } @@ -97,7 +91,6 @@ public class SingleValuedTimelineMetric { metric.setMetricName(this.metricName); metric.setAppId(this.appId); metric.setHostName(this.hostName); - metric.setType(this.type); metric.setInstanceId(this.instanceId); metric.setStartTime(this.startTime); metric.setTimestamp(this.timestamp); http://git-wip-us.apache.org/repos/asf/ambari/blob/041e4e9a/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/TimelineMetric.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/TimelineMetric.java b/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/TimelineMetric.java index edace52..3d3b19c 100644 --- a/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/TimelineMetric.java +++ b/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/TimelineMetric.java @@ -52,6 +52,14 @@ public class TimelineMetric implements Comparable<TimelineMetric> { } + // To reconstruct TimelineMetric from UUID. + public TimelineMetric(String metricName, String hostname, String appId, String instanceId) { + this.metricName = metricName; + this.hostName = hostname; + this.appId = appId; + this.instanceId = instanceId; + } + // copy constructor public TimelineMetric(TimelineMetric metric) { setMetricName(metric.getMetricName()); http://git-wip-us.apache.org/repos/asf/ambari/blob/041e4e9a/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/TimelineMetricMetadata.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/TimelineMetricMetadata.java b/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/TimelineMetricMetadata.java index 727becc..6c9712f 100644 --- a/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/TimelineMetricMetadata.java +++ b/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/TimelineMetricMetadata.java @@ -17,6 +17,8 @@ */ package org.apache.hadoop.metrics2.sink.timeline; +import org.apache.commons.lang.ArrayUtils; +import org.apache.commons.lang.StringUtils; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; import org.codehaus.jackson.annotate.JsonIgnore; @@ -32,6 +34,8 @@ import javax.xml.bind.annotation.XmlRootElement; public class TimelineMetricMetadata { private String metricName; private String appId; + private String instanceId; + private byte[] uuid; private String units; private String type = "UNDEFINED"; private Long seriesStartTime; @@ -51,11 +55,12 @@ public class TimelineMetricMetadata { public TimelineMetricMetadata() { } - public TimelineMetricMetadata(String metricName, String appId, String units, + public TimelineMetricMetadata(String metricName, String appId, String instanceId, String units, String type, Long seriesStartTime, boolean supportsAggregates, boolean isWhitelisted) { this.metricName = metricName; this.appId = appId; + this.instanceId = instanceId; this.units = units; this.type = type; this.seriesStartTime = seriesStartTime; @@ -82,6 +87,24 @@ public class TimelineMetricMetadata { this.appId = appId; } + @XmlElement(name = "instanceId") + public String getInstanceId() { + return instanceId; + } + + public void setInstanceId(String instanceId) { + this.instanceId = instanceId; + } + + @XmlElement(name = "uuid") + public byte[] getUuid() { + return uuid; + } + + public void setUuid(byte[] uuid) { + this.uuid = uuid; + } + @XmlElement(name = "units") public String getUnits() { return units; @@ -102,7 +125,7 @@ public class TimelineMetricMetadata { @XmlElement(name = "seriesStartTime") public Long getSeriesStartTime() { - return seriesStartTime; + return (seriesStartTime != null) ? seriesStartTime : 0l; } public void setSeriesStartTime(Long seriesStartTime) { @@ -138,9 +161,10 @@ public class TimelineMetricMetadata { */ public boolean needsToBeSynced(TimelineMetricMetadata metadata) throws MetadataException { if (!this.metricName.equals(metadata.getMetricName()) || - !this.appId.equals(metadata.getAppId())) { + !this.appId.equals(metadata.getAppId()) || + !(StringUtils.isNotEmpty(instanceId) ? instanceId.equals(metadata.instanceId) : StringUtils.isEmpty(metadata.instanceId))) { throw new MetadataException("Unexpected argument: metricName = " + - metadata.getMetricName() + ", appId = " + metadata.getAppId()); + metadata.getMetricName() + ", appId = " + metadata.getAppId() + ", instanceId = " + metadata.getInstanceId()); } // Series start time should never change @@ -159,14 +183,15 @@ public class TimelineMetricMetadata { TimelineMetricMetadata that = (TimelineMetricMetadata) o; if (!metricName.equals(that.metricName)) return false; - return !(appId != null ? !appId.equals(that.appId) : that.appId != null); - + if (!appId.equals(that.appId)) return false; + return (StringUtils.isNotEmpty(instanceId) ? instanceId.equals(that.instanceId) : StringUtils.isEmpty(that.instanceId)); } @Override public int hashCode() { int result = metricName.hashCode(); result = 31 * result + (appId != null ? appId.hashCode() : 0); + result = 31 * result + (instanceId != null ? instanceId.hashCode() : 0); return result; } } http://git-wip-us.apache.org/repos/asf/ambari/blob/041e4e9a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/HBaseTimelineMetricsService.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/HBaseTimelineMetricsService.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/HBaseTimelineMetricsService.java index f962f44..66c46db 100644 --- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/HBaseTimelineMetricsService.java +++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/HBaseTimelineMetricsService.java @@ -20,6 +20,7 @@ package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline import com.google.common.collect.ArrayListMultimap; import com.google.common.collect.Multimap; import org.apache.ambari.metrics.alertservice.spark.AmsKafkaProducer; +import org.apache.commons.collections.CollectionUtils; import org.apache.commons.collections.MapUtils; import org.apache.commons.lang.StringUtils; import org.apache.commons.logging.Log; @@ -41,6 +42,7 @@ import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline. import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.TimelineMetricAggregatorFactory; import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.AggregationTaskRunner.AGGREGATOR_NAME; import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.MetricCollectorHAController; +import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.discovery.TimelineMetricHostMetadata; import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.discovery.TimelineMetricMetadataKey; import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.discovery.TimelineMetricMetadataManager; import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.Condition; @@ -82,7 +84,6 @@ public class HBaseTimelineMetricsService extends AbstractService implements Time private TimelineMetricMetadataManager metricMetadataManager; private Integer defaultTopNHostsLimit; private MetricCollectorHAController haController; - private AmsKafkaProducer kafkaProducer; /** * Construct the service. @@ -143,8 +144,6 @@ public class HBaseTimelineMetricsService extends AbstractService implements Time LOG.info("Using group by aggregators for aggregating host and cluster metrics."); } - kafkaProducer = new AmsKafkaProducer(metricsConf.get("kafka.bootstrap.servers")); //104.196.85.21:6667 - // Start the cluster aggregator second TimelineMetricAggregator secondClusterAggregator = TimelineMetricAggregatorFactory.createTimelineClusterAggregatorSecond( @@ -154,19 +153,19 @@ public class HBaseTimelineMetricsService extends AbstractService implements Time // Start the minute cluster aggregator TimelineMetricAggregator minuteClusterAggregator = TimelineMetricAggregatorFactory.createTimelineClusterAggregatorMinute( - hBaseAccessor, metricsConf, haController); + hBaseAccessor, metricsConf, metricMetadataManager, haController); scheduleAggregatorThread(minuteClusterAggregator); // Start the hourly cluster aggregator TimelineMetricAggregator hourlyClusterAggregator = TimelineMetricAggregatorFactory.createTimelineClusterAggregatorHourly( - hBaseAccessor, metricsConf, haController); + hBaseAccessor, metricsConf, metricMetadataManager, haController); scheduleAggregatorThread(hourlyClusterAggregator); // Start the daily cluster aggregator TimelineMetricAggregator dailyClusterAggregator = TimelineMetricAggregatorFactory.createTimelineClusterAggregatorDaily( - hBaseAccessor, metricsConf, haController); + hBaseAccessor, metricsConf, metricMetadataManager, haController); scheduleAggregatorThread(dailyClusterAggregator); // Start the minute host aggregator @@ -175,20 +174,20 @@ public class HBaseTimelineMetricsService extends AbstractService implements Time } else { TimelineMetricAggregator minuteHostAggregator = TimelineMetricAggregatorFactory.createTimelineMetricAggregatorMinute( - hBaseAccessor, metricsConf, haController); + hBaseAccessor, metricsConf, metricMetadataManager, haController); scheduleAggregatorThread(minuteHostAggregator); } // Start the hourly host aggregator TimelineMetricAggregator hourlyHostAggregator = TimelineMetricAggregatorFactory.createTimelineMetricAggregatorHourly( - hBaseAccessor, metricsConf, haController); + hBaseAccessor, metricsConf, metricMetadataManager, haController); scheduleAggregatorThread(hourlyHostAggregator); // Start the daily host aggregator TimelineMetricAggregator dailyHostAggregator = TimelineMetricAggregatorFactory.createTimelineMetricAggregatorDaily( - hBaseAccessor, metricsConf, haController); + hBaseAccessor, metricsConf, metricMetadataManager, haController); scheduleAggregatorThread(dailyHostAggregator); if (!configuration.isTimelineMetricsServiceWatcherDisabled()) { @@ -238,6 +237,8 @@ public class HBaseTimelineMetricsService extends AbstractService implements Time Multimap<String, List<Function>> metricFunctions = parseMetricNamesToAggregationFunctions(metricNames); + List<byte[]> uuids = metricMetadataManager.getUuids(metricFunctions.keySet(), hostnames, applicationId, instanceId); + ConditionBuilder conditionBuilder = new ConditionBuilder(new ArrayList<String>(metricFunctions.keySet())) .hostnames(hostnames) .appId(applicationId) @@ -246,7 +247,8 @@ public class HBaseTimelineMetricsService extends AbstractService implements Time .endTime(endTime) .precision(precision) .limit(limit) - .grouped(groupedByHosts); + .grouped(groupedByHosts) + .uuid(uuids); if (topNConfig != null) { if (TopNCondition.isTopNHostCondition(metricNames, hostnames) ^ //Only 1 condition should be true. @@ -368,13 +370,6 @@ public class HBaseTimelineMetricsService extends AbstractService implements Time // Error indicated by the Sql exception TimelinePutResponse response = new TimelinePutResponse(); - try { - if (!metrics.getMetrics().isEmpty() && metrics.getMetrics().get(0).getAppId().equals("HOST")) { - kafkaProducer.sendMetrics(fromTimelineMetrics(metrics)); - } - } catch (InterruptedException | ExecutionException e) { - LOG.error(e); - } hBaseAccessor.insertMetricRecordsWithMetadata(metricMetadataManager, metrics, false); return response; @@ -439,8 +434,18 @@ public class HBaseTimelineMetricsService extends AbstractService implements Time } @Override + public Map<String, TimelineMetricMetadataKey> getUuids() throws SQLException, IOException { + return metricMetadataManager.getUuidKeyMap(); + } + + @Override public Map<String, Set<String>> getHostAppsMetadata() throws SQLException, IOException { - return metricMetadataManager.getHostedAppsCache(); + Map<String, TimelineMetricHostMetadata> hostsMetadata = metricMetadataManager.getHostedAppsCache(); + Map<String, Set<String>> hostAppMap = new HashMap<>(); + for (String hostname : hostsMetadata.keySet()) { + hostAppMap.put(hostname, hostsMetadata.get(hostname).getHostedApps()); + } + return hostAppMap; } @Override @@ -459,7 +464,7 @@ public class HBaseTimelineMetricsService extends AbstractService implements Time public Map<String, Map<String,Set<String>>> getInstanceHostsMetadata(String instanceId, String appId) throws SQLException, IOException { - Map<String, Set<String>> hostedApps = metricMetadataManager.getHostedAppsCache(); + Map<String, Set<String>> hostedApps = getHostAppsMetadata(); Map<String, Set<String>> instanceHosts = metricMetadataManager.getHostedInstanceCache(); Map<String, Map<String, Set<String>>> instanceAppHosts = new HashMap<>(); http://git-wip-us.apache.org/repos/asf/ambari/blob/041e4e9a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixHBaseAccessor.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixHBaseAccessor.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixHBaseAccessor.java index 15b0bb8..7ad88a1 100644 --- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixHBaseAccessor.java +++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixHBaseAccessor.java @@ -132,6 +132,7 @@ import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline. import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.Function; import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.TimelineClusterMetric; import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.TimelineMetricReadHelper; +import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.discovery.TimelineMetricHostMetadata; import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.discovery.TimelineMetricMetadataKey; import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.discovery.TimelineMetricMetadataManager; import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.Condition; @@ -171,7 +172,7 @@ public class PhoenixHBaseAccessor { private static final int POINTS_PER_MINUTE = 6; public static int RESULTSET_LIMIT = (int)TimeUnit.HOURS.toMinutes(2) * METRICS_PER_MINUTE * POINTS_PER_MINUTE ; - static final TimelineMetricReadHelper TIMELINE_METRIC_READ_HELPER = new TimelineMetricReadHelper(); + static TimelineMetricReadHelper TIMELINE_METRIC_READ_HELPER = new TimelineMetricReadHelper(); static ObjectMapper mapper = new ObjectMapper(); static TypeReference<TreeMap<Long, Double>> metricValuesTypeRef = new TypeReference<TreeMap<Long, Double>>() {}; @@ -190,6 +191,7 @@ public class PhoenixHBaseAccessor { private final boolean skipBlockCacheForAggregatorsEnabled; private final String timelineMetricsTablesDurability; private final String timelineMetricsPrecisionTableDurability; + private TimelineMetricMetadataManager metadataManagerInstance; static final String HSTORE_COMPACTION_CLASS_KEY = "hbase.hstore.defaultengine.compactionpolicy.class"; @@ -282,6 +284,7 @@ public class PhoenixHBaseAccessor { } rawMetricsSource = internalSourceProvider.getInternalMetricsSource(RAW_METRICS, interval, rawMetricsSink); } + TIMELINE_METRIC_READ_HELPER = new TimelineMetricReadHelper(this.metadataManagerInstance); } public boolean isInsertCacheEmpty() { @@ -336,19 +339,20 @@ public class PhoenixHBaseAccessor { double[] aggregates = AggregatorUtils.calculateAggregates( metric.getMetricValues()); - metricRecordStmt.setString(1, metric.getMetricName()); - metricRecordStmt.setString(2, metric.getHostName()); - metricRecordStmt.setString(3, metric.getAppId()); - metricRecordStmt.setString(4, metric.getInstanceId()); - metricRecordStmt.setLong(5, currentTime); - metricRecordStmt.setLong(6, metric.getStartTime()); - metricRecordStmt.setString(7, metric.getUnits()); - metricRecordStmt.setDouble(8, aggregates[0]); - metricRecordStmt.setDouble(9, aggregates[1]); - metricRecordStmt.setDouble(10, aggregates[2]); - metricRecordStmt.setLong(11, (long) aggregates[3]); + byte[] uuid = metadataManagerInstance.getUuid(metric); + if (uuid == null) { + LOG.error("Error computing UUID for metric. Cannot write metrics : " + metric.toString()); + continue; + } + metricRecordStmt.setBytes(1, uuid); + metricRecordStmt.setLong(2, currentTime); + metricRecordStmt.setLong(3, metric.getStartTime()); + metricRecordStmt.setDouble(4, aggregates[0]); + metricRecordStmt.setDouble(5, aggregates[1]); + metricRecordStmt.setDouble(6, aggregates[2]); + metricRecordStmt.setLong(7, (long) aggregates[3]); String json = TimelineUtils.dumpTimelineRecordtoJSON(metric.getMetricValues()); - metricRecordStmt.setString(12, json); + metricRecordStmt.setString(8, json); try { metricRecordStmt.executeUpdate(); @@ -477,20 +481,12 @@ public class PhoenixHBaseAccessor { // Host level String precisionSql = String.format(CREATE_METRICS_TABLE_SQL, encoding, tableTTL.get(METRICS_RECORD_TABLE_NAME), compression); - String splitPoints = metricsConf.get(PRECISION_TABLE_SPLIT_POINTS); - if (!StringUtils.isEmpty(splitPoints)) { - precisionSql += getSplitPointsStr(splitPoints); - } stmt.executeUpdate(precisionSql); String hostMinuteAggregrateSql = String.format(CREATE_METRICS_AGGREGATE_TABLE_SQL, METRICS_AGGREGATE_MINUTE_TABLE_NAME, encoding, tableTTL.get(METRICS_AGGREGATE_MINUTE_TABLE_NAME), compression); - splitPoints = metricsConf.get(AGGREGATE_TABLE_SPLIT_POINTS); - if (!StringUtils.isEmpty(splitPoints)) { - hostMinuteAggregrateSql += getSplitPointsStr(splitPoints); - } stmt.executeUpdate(hostMinuteAggregrateSql); stmt.executeUpdate(String.format(CREATE_METRICS_AGGREGATE_TABLE_SQL, @@ -507,10 +503,7 @@ public class PhoenixHBaseAccessor { METRICS_CLUSTER_AGGREGATE_TABLE_NAME, encoding, tableTTL.get(METRICS_CLUSTER_AGGREGATE_TABLE_NAME), compression); - splitPoints = metricsConf.get(AGGREGATE_TABLE_SPLIT_POINTS); - if (!StringUtils.isEmpty(splitPoints)) { - aggregateSql += getSplitPointsStr(splitPoints); - } + stmt.executeUpdate(aggregateSql); stmt.executeUpdate(String.format(CREATE_METRICS_CLUSTER_AGGREGATE_GROUPED_TABLE_SQL, METRICS_CLUSTER_AGGREGATE_MINUTE_TABLE_NAME, encoding, @@ -961,7 +954,8 @@ public class PhoenixHBaseAccessor { private void appendMetricFromResultSet(TimelineMetrics metrics, Condition condition, Multimap<String, List<Function>> metricFunctions, ResultSet rs) throws SQLException, IOException { - String metricName = rs.getString("METRIC_NAME"); + byte[] uuid = rs.getBytes("UUID"); + String metricName = metadataManagerInstance.getMetricNameFromUuid(uuid); Collection<List<Function>> functionList = findMetricFunctions(metricFunctions, metricName); for (List<Function> functions : functionList) { @@ -1103,7 +1097,8 @@ public class PhoenixHBaseAccessor { Condition condition, Multimap<String, List<Function>> metricFunctions, ResultSet rs) throws SQLException { - String metricName = rs.getString("METRIC_NAME"); + byte[] uuid = rs.getBytes("UUID"); + String metricName = metadataManagerInstance.getMetricNameFromUuid(uuid); Collection<List<Function>> functionList = findMetricFunctions(metricFunctions, metricName); for (List<Function> functions : functionList) { @@ -1136,14 +1131,15 @@ public class PhoenixHBaseAccessor { SplitByMetricNamesCondition splitCondition = new SplitByMetricNamesCondition(condition); - for (String metricName: splitCondition.getOriginalMetricNames()) { + for (byte[] uuid: condition.getUuids()) { - splitCondition.setCurrentMetric(metricName); + splitCondition.setCurrentUuid(uuid); stmt = PhoenixTransactSQL.prepareGetLatestAggregateMetricSqlStmt(conn, splitCondition); ResultSet rs = null; try { rs = stmt.executeQuery(); while (rs.next()) { + String metricName = metadataManagerInstance.getMetricNameFromUuid(uuid); Collection<List<Function>> functionList = findMetricFunctions(metricFunctions, metricName); for (List<Function> functions : functionList) { if (functions != null) { @@ -1187,14 +1183,16 @@ public class PhoenixHBaseAccessor { countColumnName = "HOSTS_COUNT"; } + byte[] uuid = rs.getBytes("UUID"); + TimelineMetric timelineMetric = metadataManagerInstance.getMetricFromUuid(uuid); + SingleValuedTimelineMetric metric = new SingleValuedTimelineMetric( - rs.getString("METRIC_NAME") + f.getSuffix(), - rs.getString("APP_ID"), - rs.getString("INSTANCE_ID"), + timelineMetric.getMetricName() + f.getSuffix(), + timelineMetric.getAppId(), + timelineMetric.getInstanceId(), null, rs.getLong("SERVER_TIME"), - rs.getLong("SERVER_TIME"), - rs.getString("UNITS") + rs.getLong("SERVER_TIME") ); double value; @@ -1277,18 +1275,19 @@ public class PhoenixHBaseAccessor { TimelineMetric metric = metricAggregate.getKey(); MetricHostAggregate hostAggregate = metricAggregate.getValue(); + byte[] uuid = metadataManagerInstance.getUuid(metric); + if (uuid == null) { + LOG.error("Error computing UUID for metric. Cannot write metric : " + metric.toString()); + continue; + } rowCount++; stmt.clearParameters(); - stmt.setString(1, metric.getMetricName()); - stmt.setString(2, metric.getHostName()); - stmt.setString(3, metric.getAppId()); - stmt.setString(4, metric.getInstanceId()); - stmt.setLong(5, metric.getTimestamp()); - stmt.setString(6, metric.getType()); - stmt.setDouble(7, hostAggregate.getSum()); - stmt.setDouble(8, hostAggregate.getMax()); - stmt.setDouble(9, hostAggregate.getMin()); - stmt.setDouble(10, hostAggregate.getNumberOfSamples()); + stmt.setBytes(1, uuid); + stmt.setLong(2, metric.getTimestamp()); + stmt.setDouble(3, hostAggregate.getSum()); + stmt.setDouble(4, hostAggregate.getMax()); + stmt.setDouble(5, hostAggregate.getMin()); + stmt.setDouble(6, hostAggregate.getNumberOfSamples()); try { stmt.executeUpdate(); @@ -1372,16 +1371,18 @@ public class PhoenixHBaseAccessor { } rowCount++; + byte[] uuid = metadataManagerInstance.getUuid(clusterMetric); + if (uuid == null) { + LOG.error("Error computing UUID for metric. Cannot write metrics : " + clusterMetric.toString()); + continue; + } stmt.clearParameters(); - stmt.setString(1, clusterMetric.getMetricName()); - stmt.setString(2, clusterMetric.getAppId()); - stmt.setString(3, clusterMetric.getInstanceId()); - stmt.setLong(4, clusterMetric.getTimestamp()); - stmt.setString(5, clusterMetric.getType()); - stmt.setDouble(6, aggregate.getSum()); - stmt.setInt(7, aggregate.getNumberOfHosts()); - stmt.setDouble(8, aggregate.getMax()); - stmt.setDouble(9, aggregate.getMin()); + stmt.setBytes(1, uuid); + stmt.setLong(2, clusterMetric.getTimestamp()); + stmt.setDouble(3, aggregate.getSum()); + stmt.setInt(4, aggregate.getNumberOfHosts()); + stmt.setDouble(5, aggregate.getMax()); + stmt.setDouble(6, aggregate.getMin()); try { stmt.executeUpdate(); @@ -1458,17 +1459,20 @@ public class PhoenixHBaseAccessor { "aggregate = " + aggregate); } + byte[] uuid = metadataManagerInstance.getUuid(clusterMetric); + if (uuid == null) { + LOG.error("Error computing UUID for metric. Cannot write metric : " + clusterMetric.toString()); + continue; + } + rowCount++; stmt.clearParameters(); - stmt.setString(1, clusterMetric.getMetricName()); - stmt.setString(2, clusterMetric.getAppId()); - stmt.setString(3, clusterMetric.getInstanceId()); - stmt.setLong(4, clusterMetric.getTimestamp()); - stmt.setString(5, clusterMetric.getType()); - stmt.setDouble(6, aggregate.getSum()); - stmt.setLong(7, aggregate.getNumberOfSamples()); - stmt.setDouble(8, aggregate.getMax()); - stmt.setDouble(9, aggregate.getMin()); + stmt.setBytes(1, uuid); + stmt.setLong(2, clusterMetric.getTimestamp()); + stmt.setDouble(3, aggregate.getSum()); + stmt.setLong(4, aggregate.getNumberOfSamples()); + stmt.setDouble(5, aggregate.getMax()); + stmt.setDouble(6, aggregate.getMin()); try { stmt.executeUpdate(); @@ -1556,21 +1560,23 @@ public class PhoenixHBaseAccessor { * One time save of metadata when discovering topology during aggregation. * @throws SQLException */ - public void saveHostAppsMetadata(Map<String, Set<String>> hostedApps) throws SQLException { + public void saveHostAppsMetadata(Map<String, TimelineMetricHostMetadata> hostMetadata) throws SQLException { Connection conn = getConnection(); PreparedStatement stmt = null; try { stmt = conn.prepareStatement(UPSERT_HOSTED_APPS_METADATA_SQL); int rowCount = 0; - for (Map.Entry<String, Set<String>> hostedAppsEntry : hostedApps.entrySet()) { + for (Map.Entry<String, TimelineMetricHostMetadata> hostedAppsEntry : hostMetadata.entrySet()) { + TimelineMetricHostMetadata timelineMetricHostMetadata = hostedAppsEntry.getValue(); if (LOG.isTraceEnabled()) { LOG.trace("HostedAppsMetadata: " + hostedAppsEntry); } stmt.clearParameters(); stmt.setString(1, hostedAppsEntry.getKey()); - stmt.setString(2, StringUtils.join(hostedAppsEntry.getValue(), ",")); + stmt.setBytes(2, timelineMetricHostMetadata.getUuid()); + stmt.setString(3, StringUtils.join(timelineMetricHostMetadata.getHostedApps(), ",")); try { stmt.executeUpdate(); rowCount++; @@ -1674,15 +1680,21 @@ public class PhoenixHBaseAccessor { + ", seriesStartTime = " + metadata.getSeriesStartTime() ); } - - stmt.clearParameters(); - stmt.setString(1, metadata.getMetricName()); - stmt.setString(2, metadata.getAppId()); - stmt.setString(3, metadata.getUnits()); - stmt.setString(4, metadata.getType()); - stmt.setLong(5, metadata.getSeriesStartTime()); - stmt.setBoolean(6, metadata.isSupportsAggregates()); - stmt.setBoolean(7, metadata.isWhitelisted()); + try { + stmt.clearParameters(); + stmt.setString(1, metadata.getMetricName()); + stmt.setString(2, metadata.getAppId()); + stmt.setString(3, metadata.getInstanceId()); + stmt.setBytes(4, metadata.getUuid()); + stmt.setString(5, metadata.getUnits()); + stmt.setString(6, metadata.getType()); + stmt.setLong(7, metadata.getSeriesStartTime()); + stmt.setBoolean(8, metadata.isSupportsAggregates()); + stmt.setBoolean(9, metadata.isWhitelisted()); + } catch (Exception e) { + LOG.error("Exception in saving metric metadata entry. "); + continue; + } try { stmt.executeUpdate(); @@ -1713,8 +1725,8 @@ public class PhoenixHBaseAccessor { } } - public Map<String, Set<String>> getHostedAppsMetadata() throws SQLException { - Map<String, Set<String>> hostedAppMap = new HashMap<>(); + public Map<String, TimelineMetricHostMetadata> getHostedAppsMetadata() throws SQLException { + Map<String, TimelineMetricHostMetadata> hostedAppMap = new HashMap<>(); Connection conn = getConnection(); PreparedStatement stmt = null; ResultSet rs = null; @@ -1724,8 +1736,9 @@ public class PhoenixHBaseAccessor { rs = stmt.executeQuery(); while (rs.next()) { - hostedAppMap.put(rs.getString("HOSTNAME"), - new HashSet<>(Arrays.asList(StringUtils.split(rs.getString("APP_IDS"), ",")))); + TimelineMetricHostMetadata hostMetadata = new TimelineMetricHostMetadata(new HashSet<>(Arrays.asList(StringUtils.split(rs.getString("APP_IDS"), ",")))); + hostMetadata.setUuid(rs.getBytes("UUID")); + hostedAppMap.put(rs.getString("HOSTNAME"), hostMetadata); } } finally { @@ -1816,9 +1829,11 @@ public class PhoenixHBaseAccessor { while (rs.next()) { String metricName = rs.getString("METRIC_NAME"); String appId = rs.getString("APP_ID"); + String instanceId = rs.getString("INSTANCE_ID"); TimelineMetricMetadata metadata = new TimelineMetricMetadata( metricName, appId, + instanceId, rs.getString("UNITS"), rs.getString("TYPE"), rs.getLong("START_TIME"), @@ -1826,8 +1841,9 @@ public class PhoenixHBaseAccessor { rs.getBoolean("IS_WHITELISTED") ); - TimelineMetricMetadataKey key = new TimelineMetricMetadataKey(metricName, appId); + TimelineMetricMetadataKey key = new TimelineMetricMetadataKey(metricName, appId, instanceId); metadata.setIsPersisted(true); // Always true on retrieval + metadata.setUuid(rs.getBytes("UUID")); metadataMap.put(key, metadata); } @@ -1858,4 +1874,8 @@ public class PhoenixHBaseAccessor { return metadataMap; } + public void setMetadataInstance(TimelineMetricMetadataManager metadataManager) { + this.metadataManagerInstance = metadataManager; + TIMELINE_METRIC_READ_HELPER = new TimelineMetricReadHelper(this.metadataManagerInstance); + } } http://git-wip-us.apache.org/repos/asf/ambari/blob/041e4e9a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricConfiguration.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricConfiguration.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricConfiguration.java index de33bd1..2060867 100644 --- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricConfiguration.java +++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricConfiguration.java @@ -304,6 +304,9 @@ public class TimelineMetricConfiguration { public static final String TIMELINE_METRICS_PRECISION_TABLE_HBASE_BLOCKING_STORE_FILES = "timeline.metrics.precision.table.hbase.hstore.blockingStoreFiles"; + public static final String TIMELINE_METRICS_UUID_GEN_STRATEGY = + "timeline.metrics.uuid.gen.strategy"; + public static final String HOST_APP_ID = "HOST"; public static final String DEFAULT_INSTANCE_PORT = "12001"; http://git-wip-us.apache.org/repos/asf/ambari/blob/041e4e9a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricStore.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricStore.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricStore.java index d052d54..dab4494 100644 --- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricStore.java +++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricStore.java @@ -25,6 +25,8 @@ import org.apache.hadoop.metrics2.sink.timeline.TimelineMetricMetadata; import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics; import org.apache.hadoop.metrics2.sink.timeline.TopNConfig; import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse; +import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.discovery.TimelineMetricMetadataKey; + import java.io.IOException; import java.sql.SQLException; import java.util.List; @@ -98,9 +100,11 @@ public interface TimelineMetricStore { */ Map<String, Map<String,Set<String>>> getInstanceHostsMetadata(String instanceId, String appId) throws SQLException, IOException; - /** - * Return a list of known live collector nodes - * @return [ hostname ] - */ + Map<String, TimelineMetricMetadataKey> getUuids() throws SQLException, IOException; + + /** + * Return a list of known live collector nodes + * @return [ hostname ] + */ List<String> getLiveInstances(); } http://git-wip-us.apache.org/repos/asf/ambari/blob/041e4e9a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricsFilter.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricsFilter.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricsFilter.java index 1446ec2..04cf422 100644 --- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricsFilter.java +++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricsFilter.java @@ -167,11 +167,4 @@ public class TimelineMetricsFilter { return false; } - public static void addToWhitelist(String metricName) { - - if (StringUtils.isNotEmpty(metricName)) { - whitelistedMetrics.add(metricName); - } - } - } http://git-wip-us.apache.org/repos/asf/ambari/blob/041e4e9a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/AbstractTimelineAggregator.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/AbstractTimelineAggregator.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/AbstractTimelineAggregator.java index cb131d3..83f2392 100644 --- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/AbstractTimelineAggregator.java +++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/AbstractTimelineAggregator.java @@ -272,7 +272,8 @@ public abstract class AbstractTimelineAggregator implements TimelineMetricAggreg conn.commit(); LOG.info(rows + " row(s) updated in aggregation."); - downsample(conn, startTime, endTime); + //TODO : Fix downsampling after UUID change. + //downsample(conn, startTime, endTime); } else { rs = stmt.executeQuery(); } @@ -280,7 +281,7 @@ public abstract class AbstractTimelineAggregator implements TimelineMetricAggreg aggregate(rs, startTime, endTime); - } catch (SQLException | IOException e) { + } catch (Exception e) { LOG.error("Exception during aggregating metrics.", e); success = false; } finally { @@ -455,25 +456,29 @@ public abstract class AbstractTimelineAggregator implements TimelineMetricAggreg * @return */ protected String getDownsampledMetricSkipClause() { - if (CollectionUtils.isEmpty(this.downsampleMetricPatterns)) { - return StringUtils.EMPTY; - } - - StringBuilder sb = new StringBuilder(); - - for (int i = 0; i < downsampleMetricPatterns.size(); i++) { - sb.append(" METRIC_NAME"); - sb.append(" NOT"); - sb.append(" LIKE "); - sb.append("'" + downsampleMetricPatterns.get(i) + "'"); - if (i < downsampleMetricPatterns.size() - 1) { - sb.append(" AND "); - } - } - - sb.append(" AND "); - return sb.toString(); + //TODO Fix downsampling for UUID change. + return StringUtils.EMPTY; + +// if (CollectionUtils.isEmpty(this.downsampleMetricPatterns)) { +// return StringUtils.EMPTY; +// } +// +// StringBuilder sb = new StringBuilder(); +// +// for (int i = 0; i < downsampleMetricPatterns.size(); i++) { +// sb.append(" METRIC_NAME"); +// sb.append(" NOT"); +// sb.append(" LIKE "); +// sb.append("'" + downsampleMetricPatterns.get(i) + "'"); +// +// if (i < downsampleMetricPatterns.size() - 1) { +// sb.append(" AND "); +// } +// } +// +// sb.append(" AND "); +// return sb.toString(); } /** http://git-wip-us.apache.org/repos/asf/ambari/blob/041e4e9a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineClusterMetric.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineClusterMetric.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineClusterMetric.java index b7d9110..6e793e1 100644 --- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineClusterMetric.java +++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineClusterMetric.java @@ -22,15 +22,13 @@ public class TimelineClusterMetric { private String appId; private String instanceId; private long timestamp; - private String type; public TimelineClusterMetric(String metricName, String appId, String instanceId, - long timestamp, String type) { + long timestamp) { this.metricName = metricName; this.appId = appId; this.instanceId = instanceId; this.timestamp = timestamp; - this.type = type; } public String getMetricName() { @@ -49,8 +47,6 @@ public class TimelineClusterMetric { return timestamp; } - public String getType() { return type; } - @Override public boolean equals(Object o) { if (this == o) return true; http://git-wip-us.apache.org/repos/asf/ambari/blob/041e4e9a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricAggregatorFactory.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricAggregatorFactory.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricAggregatorFactory.java index 2eb3553..081e610 100644 --- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricAggregatorFactory.java +++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricAggregatorFactory.java @@ -95,6 +95,7 @@ public class TimelineMetricAggregatorFactory { */ public static TimelineMetricAggregator createTimelineMetricAggregatorMinute (PhoenixHBaseAccessor hBaseAccessor, Configuration metricsConf, + TimelineMetricMetadataManager metadataManager, MetricCollectorHAController haController) { String checkpointDir = metricsConf.get( @@ -128,6 +129,7 @@ public class TimelineMetricAggregatorFactory { return new TimelineMetricHostAggregator( METRIC_RECORD_MINUTE, + metadataManager, hBaseAccessor, metricsConf, checkpointLocation, sleepIntervalMillis, @@ -145,6 +147,7 @@ public class TimelineMetricAggregatorFactory { */ public static TimelineMetricAggregator createTimelineMetricAggregatorHourly (PhoenixHBaseAccessor hBaseAccessor, Configuration metricsConf, + TimelineMetricMetadataManager metadataManager, MetricCollectorHAController haController) { String checkpointDir = metricsConf.get( @@ -178,6 +181,7 @@ public class TimelineMetricAggregatorFactory { return new TimelineMetricHostAggregator( METRIC_RECORD_HOURLY, + metadataManager, hBaseAccessor, metricsConf, checkpointLocation, sleepIntervalMillis, @@ -195,6 +199,7 @@ public class TimelineMetricAggregatorFactory { */ public static TimelineMetricAggregator createTimelineMetricAggregatorDaily (PhoenixHBaseAccessor hBaseAccessor, Configuration metricsConf, + TimelineMetricMetadataManager metadataManager, MetricCollectorHAController haController) { String checkpointDir = metricsConf.get( @@ -228,6 +233,7 @@ public class TimelineMetricAggregatorFactory { return new TimelineMetricHostAggregator( METRIC_RECORD_DAILY, + metadataManager, hBaseAccessor, metricsConf, checkpointLocation, sleepIntervalMillis, @@ -291,6 +297,7 @@ public class TimelineMetricAggregatorFactory { */ public static TimelineMetricAggregator createTimelineClusterAggregatorMinute( PhoenixHBaseAccessor hBaseAccessor, Configuration metricsConf, + TimelineMetricMetadataManager metadataManager, MetricCollectorHAController haController) { String checkpointDir = metricsConf.get( @@ -326,6 +333,7 @@ public class TimelineMetricAggregatorFactory { return new TimelineMetricClusterAggregator( METRIC_AGGREGATE_MINUTE, + metadataManager, hBaseAccessor, metricsConf, checkpointLocation, sleepIntervalMillis, @@ -344,6 +352,7 @@ public class TimelineMetricAggregatorFactory { */ public static TimelineMetricAggregator createTimelineClusterAggregatorHourly( PhoenixHBaseAccessor hBaseAccessor, Configuration metricsConf, + TimelineMetricMetadataManager metadataManager, MetricCollectorHAController haController) { String checkpointDir = metricsConf.get( @@ -379,6 +388,7 @@ public class TimelineMetricAggregatorFactory { return new TimelineMetricClusterAggregator( METRIC_AGGREGATE_HOURLY, + metadataManager, hBaseAccessor, metricsConf, checkpointLocation, sleepIntervalMillis, @@ -397,6 +407,7 @@ public class TimelineMetricAggregatorFactory { */ public static TimelineMetricAggregator createTimelineClusterAggregatorDaily( PhoenixHBaseAccessor hBaseAccessor, Configuration metricsConf, + TimelineMetricMetadataManager metadataManager, MetricCollectorHAController haController) { String checkpointDir = metricsConf.get( @@ -432,6 +443,7 @@ public class TimelineMetricAggregatorFactory { return new TimelineMetricClusterAggregator( METRIC_AGGREGATE_DAILY, + metadataManager, hBaseAccessor, metricsConf, checkpointLocation, sleepIntervalMillis, http://git-wip-us.apache.org/repos/asf/ambari/blob/041e4e9a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricAppAggregator.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricAppAggregator.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricAppAggregator.java index 9eaf456..55104de 100644 --- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricAppAggregator.java +++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricAppAggregator.java @@ -24,6 +24,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.metrics2.sink.timeline.MetricClusterAggregate; import org.apache.hadoop.metrics2.sink.timeline.TimelineMetricMetadata; import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricsFilter; +import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.discovery.TimelineMetricHostMetadata; import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.discovery.TimelineMetricMetadataKey; import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.discovery.TimelineMetricMetadataManager; @@ -48,14 +49,14 @@ public class TimelineMetricAppAggregator { private static final Log LOG = LogFactory.getLog(TimelineMetricAppAggregator.class); // Lookup to check candidacy of an app private final List<String> appIdsToAggregate; - private final Map<String, Set<String>> hostedAppsMap; + private final Map<String, TimelineMetricHostMetadata> hostMetadata; Map<TimelineClusterMetric, MetricClusterAggregate> aggregateClusterMetrics = new HashMap<>(); TimelineMetricMetadataManager metadataManagerInstance; public TimelineMetricAppAggregator(TimelineMetricMetadataManager metadataManager, Configuration metricsConf) { appIdsToAggregate = getAppIdsForHostAggregation(metricsConf); - hostedAppsMap = metadataManager.getHostedAppsCache(); + hostMetadata = metadataManager.getHostedAppsCache(); metadataManagerInstance = metadataManager; LOG.info("AppIds configured for aggregation: " + appIdsToAggregate); } @@ -95,17 +96,20 @@ public class TimelineMetricAppAggregator { // If metric is a host metric and host has apps on it if (appId.equalsIgnoreCase(HOST_APP_ID)) { // Candidate metric, update app aggregates - if (hostedAppsMap.containsKey(hostname)) { + if (hostMetadata.containsKey(hostname)) { updateAppAggregatesFromHostMetric(clusterMetric, hostname, metricValue); } } else { // Build the hostedapps map if not a host metric // Check app candidacy for host aggregation if (appIdsToAggregate.contains(appId)) { - Set<String> appIds = hostedAppsMap.get(hostname); - if (appIds == null) { + TimelineMetricHostMetadata timelineMetricHostMetadata = hostMetadata.get(hostname); + Set<String> appIds; + if (timelineMetricHostMetadata == null) { appIds = new HashSet<>(); - hostedAppsMap.put(hostname, appIds); + hostMetadata.put(hostname, new TimelineMetricHostMetadata(appIds)); + } else { + appIds = timelineMetricHostMetadata.getHostedApps(); } if (!appIds.contains(appId)) { appIds.add(appId); @@ -127,20 +131,20 @@ public class TimelineMetricAppAggregator { return; } - TimelineMetricMetadataKey appKey = new TimelineMetricMetadataKey(clusterMetric.getMetricName(), HOST_APP_ID); - Set<String> apps = hostedAppsMap.get(hostname); + TimelineMetricMetadataKey appKey = new TimelineMetricMetadataKey(clusterMetric.getMetricName(), HOST_APP_ID, clusterMetric.getInstanceId()); + Set<String> apps = hostMetadata.get(hostname).getHostedApps(); for (String appId : apps) { if (appIdsToAggregate.contains(appId)) { appKey.setAppId(appId); TimelineMetricMetadata appMetadata = metadataManagerInstance.getMetadataCacheValue(appKey); if (appMetadata == null) { - TimelineMetricMetadataKey key = new TimelineMetricMetadataKey(clusterMetric.getMetricName(), HOST_APP_ID); + TimelineMetricMetadataKey key = new TimelineMetricMetadataKey(clusterMetric.getMetricName(), HOST_APP_ID, clusterMetric.getInstanceId()); TimelineMetricMetadata hostMetricMetadata = metadataManagerInstance.getMetadataCacheValue(key); if (hostMetricMetadata != null) { TimelineMetricMetadata timelineMetricMetadata = new TimelineMetricMetadata(clusterMetric.getMetricName(), - appId, hostMetricMetadata.getUnits(), hostMetricMetadata.getType(), hostMetricMetadata.getSeriesStartTime(), + appId, clusterMetric.getInstanceId(), hostMetricMetadata.getUnits(), hostMetricMetadata.getType(), hostMetricMetadata.getSeriesStartTime(), hostMetricMetadata.isSupportsAggregates(), TimelineMetricsFilter.acceptMetric(clusterMetric.getMetricName(), appId)); metadataManagerInstance.putIfModifiedTimelineMetricMetadata(timelineMetricMetadata); } @@ -151,9 +155,7 @@ public class TimelineMetricAppAggregator { new TimelineClusterMetric(clusterMetric.getMetricName(), appId, clusterMetric.getInstanceId(), - clusterMetric.getTimestamp(), - clusterMetric.getType() - ); + clusterMetric.getTimestamp()); MetricClusterAggregate clusterAggregate = aggregateClusterMetrics.get(appTimelineClusterMetric); http://git-wip-us.apache.org/repos/asf/ambari/blob/041e4e9a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricClusterAggregator.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricClusterAggregator.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricClusterAggregator.java index 74d4013..0f6dd79 100644 --- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricClusterAggregator.java +++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricClusterAggregator.java @@ -23,6 +23,7 @@ import org.apache.hadoop.metrics2.sink.timeline.MetricHostAggregate; import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixHBaseAccessor; import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.AggregationTaskRunner.AGGREGATOR_NAME; import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.MetricCollectorHAController; +import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.discovery.TimelineMetricMetadataManager; import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.Condition; import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.DefaultCondition; @@ -37,10 +38,11 @@ import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.ti import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.METRICS_CLUSTER_AGGREGATE_TABLE_NAME; public class TimelineMetricClusterAggregator extends AbstractTimelineAggregator { - private final TimelineMetricReadHelper readHelper = new TimelineMetricReadHelper(true); + private final TimelineMetricReadHelper readHelper; private final boolean isClusterPrecisionInputTable; public TimelineMetricClusterAggregator(AGGREGATOR_NAME aggregatorName, + TimelineMetricMetadataManager metricMetadataManager, PhoenixHBaseAccessor hBaseAccessor, Configuration metricsConf, String checkpointLocation, @@ -56,6 +58,7 @@ public class TimelineMetricClusterAggregator extends AbstractTimelineAggregator hostAggregatorDisabledParam, inputTableName, outputTableName, nativeTimeRangeDelay, haController); isClusterPrecisionInputTable = inputTableName.equals(METRICS_CLUSTER_AGGREGATE_TABLE_NAME); + readHelper = new TimelineMetricReadHelper(metricMetadataManager, true); } @Override @@ -71,9 +74,7 @@ public class TimelineMetricClusterAggregator extends AbstractTimelineAggregator } condition.setStatement(sqlStr); - condition.addOrderByColumn("METRIC_NAME"); - condition.addOrderByColumn("APP_ID"); - condition.addOrderByColumn("INSTANCE_ID"); + condition.addOrderByColumn("UUID"); condition.addOrderByColumn("SERVER_TIME"); return condition; } http://git-wip-us.apache.org/repos/asf/ambari/blob/041e4e9a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricClusterAggregatorSecond.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricClusterAggregatorSecond.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricClusterAggregatorSecond.java index 34b1f9b..cae7263 100644 --- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricClusterAggregatorSecond.java +++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricClusterAggregatorSecond.java @@ -56,7 +56,7 @@ import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline. */ public class TimelineMetricClusterAggregatorSecond extends AbstractTimelineAggregator { public Long timeSliceIntervalMillis; - private TimelineMetricReadHelper timelineMetricReadHelper = new TimelineMetricReadHelper(true); + private TimelineMetricReadHelper timelineMetricReadHelper; // Aggregator to perform app-level aggregates for host metrics private final TimelineMetricAppAggregator appAggregator; // 1 minute client side buffering adjustment @@ -64,6 +64,7 @@ public class TimelineMetricClusterAggregatorSecond extends AbstractTimelineAggre private final boolean interpolationEnabled; private TimelineMetricMetadataManager metadataManagerInstance; private String skipAggrPatternStrings; + private final static String liveHostsMetricName = "live_hosts"; public TimelineMetricClusterAggregatorSecond(AGGREGATOR_NAME aggregatorName, TimelineMetricMetadataManager metadataManager, @@ -88,6 +89,7 @@ public class TimelineMetricClusterAggregatorSecond extends AbstractTimelineAggre this.serverTimeShiftAdjustment = Long.parseLong(metricsConf.get(SERVER_SIDE_TIMESIFT_ADJUSTMENT, "90000")); this.interpolationEnabled = Boolean.parseBoolean(metricsConf.get(TIMELINE_METRICS_CLUSTER_AGGREGATOR_INTERPOLATION_ENABLED, "true")); this.skipAggrPatternStrings = metricsConf.get(TIMELINE_METRIC_AGGREGATION_SQL_FILTERS); + this.timelineMetricReadHelper = new TimelineMetricReadHelper(metadataManager, true); } @Override @@ -127,10 +129,7 @@ public class TimelineMetricClusterAggregatorSecond extends AbstractTimelineAggre condition.setStatement(String.format(GET_METRIC_SQL, getQueryHint(startTime), METRICS_RECORD_TABLE_NAME)); // Retaining order of the row-key avoids client side merge sort. - condition.addOrderByColumn("METRIC_NAME"); - condition.addOrderByColumn("HOSTNAME"); - condition.addOrderByColumn("APP_ID"); - condition.addOrderByColumn("INSTANCE_ID"); + condition.addOrderByColumn("UUID"); condition.addOrderByColumn("SERVER_TIME"); return condition; } @@ -212,7 +211,7 @@ public class TimelineMetricClusterAggregatorSecond extends AbstractTimelineAggre protected int processAggregateClusterMetrics(Map<TimelineClusterMetric, MetricClusterAggregate> aggregateClusterMetrics, TimelineMetric metric, List<Long[]> timeSlices) { // Create time slices - TimelineMetricMetadataKey appKey = new TimelineMetricMetadataKey(metric.getMetricName(), metric.getAppId()); + TimelineMetricMetadataKey appKey = new TimelineMetricMetadataKey(metric.getMetricName(), metric.getAppId(), metric.getInstanceId()); TimelineMetricMetadata metricMetadata = metadataManagerInstance.getMetadataCacheValue(appKey); if (metricMetadata != null && !metricMetadata.isSupportsAggregates()) { @@ -285,8 +284,7 @@ public class TimelineMetricClusterAggregatorSecond extends AbstractTimelineAggre timelineMetric.getMetricName(), timelineMetric.getAppId(), timelineMetric.getInstanceId(), - timestamp, - timelineMetric.getType()); + timestamp); if (prevTimestamp < 0 || timestamp.equals(prevTimestamp)) { Double newValue = metric.getValue(); @@ -346,8 +344,7 @@ public class TimelineMetricClusterAggregatorSecond extends AbstractTimelineAggre timelineMetric.getMetricName(), timelineMetric.getAppId(), timelineMetric.getInstanceId(), - entry.getKey(), - timelineMetric.getType()); + entry.getKey()); timelineClusterMetricMap.put(clusterMetric, interpolatedValue); } else { @@ -404,8 +401,7 @@ public class TimelineMetricClusterAggregatorSecond extends AbstractTimelineAggre timelineMetric.getMetricName(), timelineMetric.getAppId(), timelineMetric.getInstanceId(), - timeSlice[1], - timelineMetric.getType()); + timeSlice[1]); LOG.debug("Interpolated value : " + interpolatedValue); timelineClusterMetricMap.put(clusterMetric, interpolatedValue); @@ -435,13 +431,15 @@ public class TimelineMetricClusterAggregatorSecond extends AbstractTimelineAggre for (Map.Entry<String, MutableInt> appHostsEntry : appHostsCount.entrySet()) { TimelineClusterMetric timelineClusterMetric = new TimelineClusterMetric( - "live_hosts", appHostsEntry.getKey(), null, timestamp, null); + liveHostsMetricName, appHostsEntry.getKey(), null, timestamp); Integer numOfHosts = appHostsEntry.getValue().intValue(); MetricClusterAggregate metricClusterAggregate = new MetricClusterAggregate( (double) numOfHosts, 1, null, (double) numOfHosts, (double) numOfHosts); + metadataManagerInstance.getUuid(timelineClusterMetric); + aggregateClusterMetrics.put(timelineClusterMetric, metricClusterAggregate); } http://git-wip-us.apache.org/repos/asf/ambari/blob/041e4e9a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricHostAggregator.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricHostAggregator.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricHostAggregator.java index a17433b..8f941e1 100644 --- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricHostAggregator.java +++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricHostAggregator.java @@ -25,6 +25,7 @@ import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric; import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixHBaseAccessor; import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.AggregationTaskRunner.AGGREGATOR_NAME; import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.MetricCollectorHAController; +import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.discovery.TimelineMetricMetadataManager; import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.Condition; import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.DefaultCondition; @@ -38,9 +39,10 @@ import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.ti public class TimelineMetricHostAggregator extends AbstractTimelineAggregator { private static final Log LOG = LogFactory.getLog(TimelineMetricHostAggregator.class); - TimelineMetricReadHelper readHelper = new TimelineMetricReadHelper(false); + TimelineMetricReadHelper readHelper; public TimelineMetricHostAggregator(AGGREGATOR_NAME aggregatorName, + TimelineMetricMetadataManager metricMetadataManager, PhoenixHBaseAccessor hBaseAccessor, Configuration metricsConf, String checkpointLocation, @@ -54,6 +56,7 @@ public class TimelineMetricHostAggregator extends AbstractTimelineAggregator { super(aggregatorName, hBaseAccessor, metricsConf, checkpointLocation, sleepIntervalMillis, checkpointCutOffMultiplier, hostAggregatorDisabledParam, tableName, outputTableName, nativeTimeRangeDelay, haController); + readHelper = new TimelineMetricReadHelper(metricMetadataManager, false); } @Override @@ -74,11 +77,8 @@ public class TimelineMetricHostAggregator extends AbstractTimelineAggregator { condition.setStatement(String.format(GET_METRIC_AGGREGATE_ONLY_SQL, getQueryHint(startTime), tableName)); // Retaining order of the row-key avoids client side merge sort. - condition.addOrderByColumn("METRIC_NAME"); - condition.addOrderByColumn("HOSTNAME"); + condition.addOrderByColumn("UUID"); condition.addOrderByColumn("SERVER_TIME"); - condition.addOrderByColumn("APP_ID"); - condition.addOrderByColumn("INSTANCE_ID"); return condition; } http://git-wip-us.apache.org/repos/asf/ambari/blob/041e4e9a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricReadHelper.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricReadHelper.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricReadHelper.java index 672f85f..c8b5728 100644 --- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricReadHelper.java +++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricReadHelper.java @@ -23,16 +23,17 @@ import org.apache.hadoop.metrics2.sink.timeline.MetricHostAggregate; import org.apache.hadoop.metrics2.sink.timeline.SingleValuedTimelineMetric; import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric; import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixHBaseAccessor; +import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.discovery.TimelineMetricMetadataManager; import java.io.IOException; import java.sql.ResultSet; import java.sql.SQLException; -import java.util.Map; import java.util.TreeMap; public class TimelineMetricReadHelper { private boolean ignoreInstance = false; + private TimelineMetricMetadataManager metadataManagerInstance = null; public TimelineMetricReadHelper() {} @@ -40,6 +41,15 @@ public class TimelineMetricReadHelper { this.ignoreInstance = ignoreInstance; } + public TimelineMetricReadHelper(TimelineMetricMetadataManager timelineMetricMetadataManager) { + this.metadataManagerInstance = timelineMetricMetadataManager; + } + + public TimelineMetricReadHelper(TimelineMetricMetadataManager timelineMetricMetadataManager, boolean ignoreInstance) { + this.metadataManagerInstance = timelineMetricMetadataManager; + this.ignoreInstance = ignoreInstance; + } + public TimelineMetric getTimelineMetricFromResultSet(ResultSet rs) throws SQLException, IOException { TimelineMetric metric = getTimelineMetricCommonsFromResultSet(rs); @@ -51,15 +61,16 @@ public class TimelineMetricReadHelper { public SingleValuedTimelineMetric getAggregatedTimelineMetricFromResultSet(ResultSet rs, Function f) throws SQLException, IOException { + byte[] uuid = rs.getBytes("UUID"); + TimelineMetric timelineMetric = metadataManagerInstance.getMetricFromUuid(uuid); Function function = (f != null) ? f : Function.DEFAULT_VALUE_FUNCTION; SingleValuedTimelineMetric metric = new SingleValuedTimelineMetric( - rs.getString("METRIC_NAME") + function.getSuffix(), - rs.getString("APP_ID"), - rs.getString("INSTANCE_ID"), - rs.getString("HOSTNAME"), - rs.getLong("SERVER_TIME"), + timelineMetric.getMetricName() + function.getSuffix(), + timelineMetric.getAppId(), + timelineMetric.getInstanceId(), + timelineMetric.getHostName(), rs.getLong("SERVER_TIME"), - rs.getString("UNITS") + rs.getLong("SERVER_TIME") ); double value; @@ -91,16 +102,14 @@ public class TimelineMetricReadHelper { */ public TimelineMetric getTimelineMetricCommonsFromResultSet(ResultSet rs) throws SQLException { - TimelineMetric metric = new TimelineMetric(); - metric.setMetricName(rs.getString("METRIC_NAME")); - metric.setAppId(rs.getString("APP_ID")); - if (!ignoreInstance) { - metric.setInstanceId(rs.getString("INSTANCE_ID")); + + byte[] uuid = rs.getBytes("UUID"); + TimelineMetric metric = metadataManagerInstance.getMetricFromUuid(uuid); + if (ignoreInstance) { + metric.setInstanceId(null); } - metric.setHostName(rs.getString("HOSTNAME")); metric.setTimestamp(rs.getLong("SERVER_TIME")); metric.setStartTime(rs.getLong("START_TIME")); - metric.setType(rs.getString("UNITS")); return metric; } @@ -130,14 +139,16 @@ public class TimelineMetricReadHelper { return agg; } - public TimelineClusterMetric fromResultSet(ResultSet rs) throws SQLException { + + byte[] uuid = rs.getBytes("UUID"); + TimelineMetric timelineMetric = metadataManagerInstance.getMetricFromUuid(uuid); + return new TimelineClusterMetric( - rs.getString("METRIC_NAME"), - rs.getString("APP_ID"), - ignoreInstance ? null : rs.getString("INSTANCE_ID"), - rs.getLong("SERVER_TIME"), - rs.getString("UNITS")); + timelineMetric.getMetricName(), + timelineMetric.getAppId(), + ignoreInstance ? null : timelineMetric.getInstanceId(), + rs.getLong("SERVER_TIME")); } public MetricHostAggregate getMetricHostAggregateFromResultSet(ResultSet rs) @@ -154,14 +165,8 @@ public class TimelineMetricReadHelper { public TimelineMetric getTimelineMetricKeyFromResultSet(ResultSet rs) throws SQLException, IOException { - TimelineMetric metric = new TimelineMetric(); - metric.setMetricName(rs.getString("METRIC_NAME")); - metric.setAppId(rs.getString("APP_ID")); - metric.setInstanceId(rs.getString("INSTANCE_ID")); - metric.setHostName(rs.getString("HOSTNAME")); - metric.setTimestamp(rs.getLong("SERVER_TIME")); - metric.setType(rs.getString("UNITS")); - return metric; + byte[] uuid = rs.getBytes("UUID"); + return metadataManagerInstance.getMetricFromUuid(uuid); } } http://git-wip-us.apache.org/repos/asf/ambari/blob/041e4e9a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/discovery/TimelineMetricHostMetadata.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/discovery/TimelineMetricHostMetadata.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/discovery/TimelineMetricHostMetadata.java new file mode 100644 index 0000000..06e9279 --- /dev/null +++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/discovery/TimelineMetricHostMetadata.java @@ -0,0 +1,51 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.discovery; + +import java.util.HashSet; +import java.util.Set; + +public class TimelineMetricHostMetadata { + private Set<String> hostedApps = new HashSet<>(); + private byte[] uuid; + + // Default constructor + public TimelineMetricHostMetadata() { + } + + public TimelineMetricHostMetadata(Set<String> hostedApps) { + this.hostedApps = hostedApps; + } + + public Set<String> getHostedApps() { + return hostedApps; + } + + public void setHostedApps(Set<String> hostedApps) { + this.hostedApps = hostedApps; + } + + public byte[] getUuid() { + return uuid; + } + + public void setUuid(byte[] uuid) { + this.uuid = uuid; + } +} http://git-wip-us.apache.org/repos/asf/ambari/blob/041e4e9a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/discovery/TimelineMetricMetadataKey.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/discovery/TimelineMetricMetadataKey.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/discovery/TimelineMetricMetadataKey.java index 504b502..6aeb2dd 100644 --- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/discovery/TimelineMetricMetadataKey.java +++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/discovery/TimelineMetricMetadataKey.java @@ -17,13 +17,20 @@ */ package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.discovery; +import org.apache.commons.lang3.StringUtils; + +import javax.xml.bind.annotation.XmlRootElement; + +@XmlRootElement public class TimelineMetricMetadataKey { String metricName; String appId; + String instanceId; - public TimelineMetricMetadataKey(String metricName, String appId) { + public TimelineMetricMetadataKey(String metricName, String appId, String instanceId) { this.metricName = metricName; this.appId = appId; + this.instanceId = instanceId; } public String getMetricName() { @@ -34,6 +41,10 @@ public class TimelineMetricMetadataKey { return appId; } + public String getInstanceId() { + return instanceId; + } + public void setAppId(String appId) { this.appId = appId; } @@ -46,15 +57,24 @@ public class TimelineMetricMetadataKey { TimelineMetricMetadataKey that = (TimelineMetricMetadataKey) o; if (!metricName.equals(that.metricName)) return false; - return !(appId != null ? !appId.equals(that.appId) : that.appId != null); - + if (!appId.equals(that.appId)) return false; + return (StringUtils.isNotEmpty(instanceId) ? instanceId.equals(that.instanceId) : StringUtils.isEmpty(that.instanceId)); } @Override public int hashCode() { int result = metricName.hashCode(); result = 31 * result + (appId != null ? appId.hashCode() : 0); + result = 31 * result + (instanceId != null ? instanceId.hashCode() : 0); return result; } + @Override + public String toString() { + return "TimelineMetricMetadataKey{" + + "metricName='" + metricName + '\'' + + ", appId='" + appId + '\'' + + ", instanceId='" + instanceId + '\'' + + '}'; + } }
