Repository: ambari Updated Branches: refs/heads/trunk 70f1170b0 -> 102b47736
Revert "AMBARI-8872. Support point in time queries. Breaks dashboard graphs." This reverts commit 9bf9034a5c2481a8b40befab8c3713dcd3b6f584. Project: http://git-wip-us.apache.org/repos/asf/ambari/repo Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/102b4773 Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/102b4773 Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/102b4773 Branch: refs/heads/trunk Commit: 102b47736e9f721baa6bac434cd58bfeaf105aff Parents: 70f1170 Author: Siddharth Wagle <[email protected]> Authored: Tue Dec 23 14:43:58 2014 -0800 Committer: Siddharth Wagle <[email protected]> Committed: Tue Dec 23 14:43:58 2014 -0800 ---------------------------------------------------------------------- .../metrics/timeline/PhoenixHBaseAccessor.java | 473 +++++++----------- .../metrics/timeline/PhoenixTransactSQL.java | 491 +++++-------------- .../timeline/TimelineMetricAggregator.java | 3 +- .../TimelineMetricClusterAggregator.java | 3 +- .../TimelineMetricClusterAggregatorHourly.java | 3 +- .../metrics/timeline/ITClusterAggregator.java | 5 +- .../metrics/timeline/ITMetricAggregator.java | 7 +- .../timeline/TestPhoenixTransactSQL.java | 21 +- .../metrics/timeline/AMSPropertyProvider.java | 76 ++- .../timeline/AMSPropertyProviderTest.java | 92 +--- 10 files changed, 341 insertions(+), 833 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ambari/blob/102b4773/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixHBaseAccessor.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixHBaseAccessor.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixHBaseAccessor.java index 4b04ba9..b5226ee 100644 --- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixHBaseAccessor.java +++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixHBaseAccessor.java @@ -17,6 +17,7 @@ */ package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline; + import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; @@ -34,7 +35,6 @@ import java.sql.PreparedStatement; import java.sql.ResultSet; import java.sql.SQLException; import java.sql.Statement; -import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -48,7 +48,6 @@ import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.ti import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.CREATE_METRICS_CLUSTER_AGGREGATE_TABLE_SQL; import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.CREATE_METRICS_TABLE_SQL; import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.Condition; -import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.SplitByMetricNamesCondition; import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.DEFAULT_ENCODING; import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.DEFAULT_TABLE_COMPRESSION; import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.METRICS_AGGREGATE_HOURLY_TABLE_NAME; @@ -135,6 +134,7 @@ public class PhoenixHBaseAccessor { } } + /** * Get JDBC connection to HBase store. Assumption is that the hbase * configuration is present on the classpath and loaded by the caller into @@ -148,28 +148,13 @@ public class PhoenixHBaseAccessor { return dataSource.getConnection(); } - private static TimelineMetric getLastTimelineMetricFromResultSet(ResultSet rs) - throws SQLException, IOException { - TimelineMetric metric = getTimelineMetricCommonsFromResultSet(rs); - metric.setMetricValues(readLastMetricValueFromJSON(rs.getString("METRICS"))); - - return metric; + public static Map readMetricFromJSON(String json) throws IOException { + return mapper.readValue(json, metricValuesTypeRef); } + @SuppressWarnings("unchecked") static TimelineMetric getTimelineMetricFromResultSet(ResultSet rs) throws SQLException, IOException { - TimelineMetric metric = getTimelineMetricCommonsFromResultSet(rs); - Map<Long, Double> sortedByTimeMetrics = - new TreeMap<Long, Double>(readMetricFromJSON(rs.getString("METRICS"))); - metric.setMetricValues(sortedByTimeMetrics); - return metric; - } - - /** - * Returns common part of timeline metrics record without the values. - */ - private static TimelineMetric getTimelineMetricCommonsFromResultSet(ResultSet rs) - throws SQLException { TimelineMetric metric = new TimelineMetric(); metric.setMetricName(rs.getString("METRIC_NAME")); metric.setAppId(rs.getString("APP_ID")); @@ -178,23 +163,12 @@ public class PhoenixHBaseAccessor { metric.setTimestamp(rs.getLong("SERVER_TIME")); metric.setStartTime(rs.getLong("START_TIME")); metric.setType(rs.getString("UNITS")); + Map<Long, Double> sortedByTimeMetrics = + new TreeMap<Long, Double>((Map<Long, Double>) readMetricFromJSON(rs.getString("METRICS"))); + metric.setMetricValues(sortedByTimeMetrics); return metric; } - private static Map<Long, Double> readLastMetricValueFromJSON(String json) - throws IOException { - Map<Long, Double> values = readMetricFromJSON(json); - Long lastTimeStamp = Collections.max(values.keySet()); - - return Collections.singletonMap(lastTimeStamp, values.get(lastTimeStamp)); - } - - @SuppressWarnings("unchecked") - public static Map<Long, Double> readMetricFromJSON(String json) - throws IOException { - return (Map<Long, Double>) mapper.readValue(json, metricValuesTypeRef); - } - static TimelineMetric getTimelineMetricKeyFromResultSet(ResultSet rs) throws SQLException, IOException { TimelineMetric metric = new TimelineMetric(); @@ -343,11 +317,9 @@ public class PhoenixHBaseAccessor { for (TimelineMetric metric : timelineMetrics) { metricRecordStmt.clearParameters(); - if (LOG.isTraceEnabled()) { - LOG.trace("host: " + metric.getHostName() + ", " + - "metricName = " + metric.getMetricName() + ", " + - "values: " + metric.getMetricValues()); - } + LOG.trace("host: " + metric.getHostName() + ", " + + "metricName = " + metric.getMetricName() + ", " + + "values: " + metric.getMetricValues()); Aggregator agg = new Aggregator(); double[] aggregates = agg.calculateAggregates( metric.getMetricValues()); @@ -394,32 +366,31 @@ public class PhoenixHBaseAccessor { } } + @SuppressWarnings("unchecked") public TimelineMetrics getMetricRecords(final Condition condition) throws SQLException, IOException { - verifyCondition(condition); + if (condition.isEmpty()) { + throw new SQLException("No filter criteria specified."); + } Connection conn = getConnection(); PreparedStatement stmt = null; TimelineMetrics metrics = new TimelineMetrics(); try { - //get latest - if(condition.isPointInTime()){ - stmt = getLatestMetricRecords(condition, conn, metrics); - } else { - stmt = PhoenixTransactSQL.prepareGetMetricsSqlStmt(conn, condition); + stmt = PhoenixTransactSQL.prepareGetMetricsSqlStmt(conn, condition); - ResultSet rs = stmt.executeQuery(); - while (rs.next()) { - TimelineMetric metric = getTimelineMetricFromResultSet(rs); + ResultSet rs = stmt.executeQuery(); - if (condition.isGrouped()) { - metrics.addOrMergeTimelineMetric(metric); - } else { - metrics.getMetrics().add(metric); - } + while (rs.next()) { + TimelineMetric metric = getTimelineMetricFromResultSet(rs); + + if (condition.isGrouped()) { + metrics.addOrMergeTimelineMetric(metric); + } else { + metrics.getMetrics().add(metric); } } @@ -439,221 +410,174 @@ public class PhoenixHBaseAccessor { } } } - - LOG.info("Metrics records size: " + metrics.getMetrics().size()); return metrics; } - private PreparedStatement getLatestMetricRecords( - Condition condition, Connection conn, TimelineMetrics metrics) - throws SQLException, IOException { - PreparedStatement stmt = null; - SplitByMetricNamesCondition splitCondition = - new SplitByMetricNamesCondition(condition); - - for (String metricName: splitCondition.getOriginalMetricNames()) { - splitCondition.setCurrentMetric(metricName); - stmt = PhoenixTransactSQL.prepareGetLatestMetricSqlStmt(conn, - splitCondition); - - ResultSet rs = stmt.executeQuery(); - while (rs.next()) { - TimelineMetric metric = getLastTimelineMetricFromResultSet(rs); - metrics.getMetrics().add(metric); - } - } + public void saveHostAggregateRecords(Map<TimelineMetric, + MetricHostAggregate> hostAggregateMap, String phoenixTableName) + throws SQLException { - return stmt; - } + if (hostAggregateMap != null && !hostAggregateMap.isEmpty()) { + Connection conn = getConnection(); + PreparedStatement stmt = null; - /** - * Get metrics aggregated across hosts. - * - * @param condition @Condition - * @return @TimelineMetrics - * @throws SQLException - */ - public TimelineMetrics getAggregateMetricRecords(final Condition condition) - throws SQLException { + long start = System.currentTimeMillis(); + int rowCount = 0; - verifyCondition(condition); + try { + stmt = conn.prepareStatement( + String.format(UPSERT_AGGREGATE_RECORD_SQL, phoenixTableName)); + + for (Map.Entry<TimelineMetric, MetricHostAggregate> metricAggregate : + hostAggregateMap.entrySet()) { + + TimelineMetric metric = metricAggregate.getKey(); + MetricHostAggregate hostAggregate = metricAggregate.getValue(); + + rowCount++; + stmt.clearParameters(); + stmt.setString(1, metric.getMetricName()); + stmt.setString(2, metric.getHostName()); + stmt.setString(3, metric.getAppId()); + stmt.setString(4, metric.getInstanceId()); + stmt.setLong(5, metric.getTimestamp()); + stmt.setString(6, metric.getType()); + stmt.setDouble(7, hostAggregate.getSum()); + stmt.setDouble(8, hostAggregate.getMax()); + stmt.setDouble(9, hostAggregate.getMin()); + stmt.setDouble(10, hostAggregate.getNumberOfSamples()); + + try { + // TODO: Why this exception is swallowed + stmt.executeUpdate(); + } catch (SQLException sql) { + LOG.error(sql); + } - Connection conn = getConnection(); - PreparedStatement stmt = null; - TimelineMetrics metrics = new TimelineMetrics(); + if (rowCount >= PHOENIX_MAX_MUTATION_STATE_SIZE - 1) { + conn.commit(); + rowCount = 0; + } - try { - //get latest - if(condition.isPointInTime()) { - stmt = getLatestAggregateMetricRecords(condition, conn, metrics); - } else { - stmt = PhoenixTransactSQL.prepareGetAggregateSqlStmt(conn, condition); + } - ResultSet rs = stmt.executeQuery(); - while (rs.next()) { - TimelineMetric metric = getAggregateTimelineMetricFromResultSet(rs); + conn.commit(); - if (condition.isGrouped()) { - metrics.addOrMergeTimelineMetric(metric); - } else { - metrics.getMetrics().add(metric); + } finally { + if (stmt != null) { + try { + stmt.close(); + } catch (SQLException e) { + // Ignore } } - } - } finally { - if (stmt != null) { - try { - stmt.close(); - } catch (SQLException e) { - // Ignore - } - } - if (conn != null) { - try { - conn.close(); - } catch (SQLException sql) { - // Ignore + if (conn != null) { + try { + conn.close(); + } catch (SQLException sql) { + // Ignore + } } } - } - - LOG.info("Aggregate records size: " + metrics.getMetrics().size()); - return metrics; - } - private PreparedStatement getLatestAggregateMetricRecords( - Condition condition, Connection conn, TimelineMetrics metrics) - throws SQLException { + long end = System.currentTimeMillis(); - PreparedStatement stmt = null; - SplitByMetricNamesCondition splitCondition = - new SplitByMetricNamesCondition(condition); - - for (String metricName: splitCondition.getOriginalMetricNames()) { - - splitCondition.setCurrentMetric(metricName); - stmt = PhoenixTransactSQL.prepareGetLatestAggregateMetricSqlStmt(conn, - splitCondition); - - ResultSet rs = stmt.executeQuery(); - while (rs.next()) { - TimelineMetric metric = getAggregateTimelineMetricFromResultSet(rs); - metrics.getMetrics().add(metric); + if ((end - start) > 60000l) { + LOG.info("Time to save map: " + (end - start) + ", " + + "thread = " + Thread.currentThread().getClass()); } } - - return stmt; - } - - private TimelineMetric getAggregateTimelineMetricFromResultSet( - ResultSet rs) throws SQLException { - TimelineMetric metric = new TimelineMetric(); - metric.setMetricName(rs.getString("METRIC_NAME")); - metric.setAppId(rs.getString("APP_ID")); - metric.setInstanceId(rs.getString("INSTANCE_ID")); - metric.setTimestamp(rs.getLong("SERVER_TIME")); - metric.setStartTime(rs.getLong("SERVER_TIME")); - Map<Long, Double> valueMap = Collections.singletonMap( - rs.getLong("SERVER_TIME"), - rs.getDouble("METRIC_SUM") / rs.getInt("HOSTS_COUNT")); - metric.setMetricValues(valueMap); - - return metric; - } - - private void verifyCondition(Condition condition) throws SQLException { - if (condition.isEmpty()) { - throw new SQLException("No filter criteria specified."); - } } - public void saveHostAggregateRecords(Map<TimelineMetric, - MetricHostAggregate> hostAggregateMap, String phoenixTableName) + /** + * Save Metric aggregate records. + * + * @throws SQLException + */ + public void saveClusterAggregateRecords( + Map<TimelineClusterMetric, MetricClusterAggregate> records) throws SQLException { - if (hostAggregateMap == null || hostAggregateMap.isEmpty()) { - LOG.debug("Empty aggregate records."); - return; - } - - Connection conn = getConnection(); - PreparedStatement stmt = null; - - long start = System.currentTimeMillis(); - int rowCount = 0; + if (records == null || records.isEmpty()) { + LOG.debug("Empty aggregate records."); + return; + } - try { - stmt = conn.prepareStatement( - String.format(UPSERT_AGGREGATE_RECORD_SQL, phoenixTableName)); + long start = System.currentTimeMillis(); - for (Map.Entry<TimelineMetric, MetricHostAggregate> metricAggregate : - hostAggregateMap.entrySet()) { + Connection conn = getConnection(); + PreparedStatement stmt = null; + try { + stmt = conn.prepareStatement(UPSERT_CLUSTER_AGGREGATE_SQL); + int rowCount = 0; - TimelineMetric metric = metricAggregate.getKey(); - MetricHostAggregate hostAggregate = metricAggregate.getValue(); + for (Map.Entry<TimelineClusterMetric, MetricClusterAggregate> + aggregateEntry : records.entrySet()) { + TimelineClusterMetric clusterMetric = aggregateEntry.getKey(); + MetricClusterAggregate aggregate = aggregateEntry.getValue(); - rowCount++; - stmt.clearParameters(); - stmt.setString(1, metric.getMetricName()); - stmt.setString(2, metric.getHostName()); - stmt.setString(3, metric.getAppId()); - stmt.setString(4, metric.getInstanceId()); - stmt.setLong(5, metric.getTimestamp()); - stmt.setString(6, metric.getType()); - stmt.setDouble(7, hostAggregate.getSum()); - stmt.setDouble(8, hostAggregate.getMax()); - stmt.setDouble(9, hostAggregate.getMin()); - stmt.setDouble(10, hostAggregate.getNumberOfSamples()); + LOG.trace("clusterMetric = " + clusterMetric + ", " + + "aggregate = " + aggregate); - try { - stmt.executeUpdate(); - } catch (SQLException sql) { - LOG.error(sql); - } + rowCount++; + stmt.clearParameters(); + stmt.setString(1, clusterMetric.getMetricName()); + stmt.setString(2, clusterMetric.getAppId()); + stmt.setString(3, clusterMetric.getInstanceId()); + stmt.setLong(4, clusterMetric.getTimestamp()); + stmt.setString(5, clusterMetric.getType()); + stmt.setDouble(6, aggregate.getSum()); + stmt.setInt(7, aggregate.getNumberOfHosts()); + stmt.setDouble(8, aggregate.getMax()); + stmt.setDouble(9, aggregate.getMin()); + + try { + stmt.executeUpdate(); + } catch (SQLException sql) { + // TODO: Why this exception is swallowed + LOG.error(sql); + } - if (rowCount >= PHOENIX_MAX_MUTATION_STATE_SIZE - 1) { - conn.commit(); - rowCount = 0; + if (rowCount >= PHOENIX_MAX_MUTATION_STATE_SIZE - 1) { + conn.commit(); + rowCount = 0; + } } - } - - conn.commit(); + conn.commit(); - } finally { - if (stmt != null) { - try { - stmt.close(); - } catch (SQLException e) { - // Ignore + } finally { + if (stmt != null) { + try { + stmt.close(); + } catch (SQLException e) { + // Ignore + } } - } - if (conn != null) { - try { - conn.close(); - } catch (SQLException sql) { - // Ignore + if (conn != null) { + try { + conn.close(); + } catch (SQLException sql) { + // Ignore + } } } + long end = System.currentTimeMillis(); + if ((end - start) > 60000l) { + LOG.info("Time to save: " + (end - start) + ", " + + "thread = " + Thread.currentThread().getName()); + } } - long end = System.currentTimeMillis(); - - if ((end - start) > 60000l) { - LOG.info("Time to save map: " + (end - start) + ", " + - "thread = " + Thread.currentThread().getClass()); - } - } - /** * Save Metric aggregate records. * * @throws SQLException */ - public void saveClusterAggregateRecords( - Map<TimelineClusterMetric, MetricClusterAggregate> records) + public void saveClusterAggregateHourlyRecords( + Map<TimelineClusterMetric, MetricHostAggregate> records, + String tableName) throws SQLException { - if (records == null || records.isEmpty()) { LOG.debug("Empty aggregate records."); return; @@ -664,18 +588,17 @@ public class PhoenixHBaseAccessor { Connection conn = getConnection(); PreparedStatement stmt = null; try { - stmt = conn.prepareStatement(UPSERT_CLUSTER_AGGREGATE_SQL); + stmt = conn.prepareStatement(String.format + (UPSERT_CLUSTER_AGGREGATE_TIME_SQL, tableName)); int rowCount = 0; - for (Map.Entry<TimelineClusterMetric, MetricClusterAggregate> + for (Map.Entry<TimelineClusterMetric, MetricHostAggregate> aggregateEntry : records.entrySet()) { TimelineClusterMetric clusterMetric = aggregateEntry.getKey(); - MetricClusterAggregate aggregate = aggregateEntry.getValue(); + MetricHostAggregate aggregate = aggregateEntry.getValue(); - if (LOG.isTraceEnabled()) { - LOG.trace("clusterMetric = " + clusterMetric + ", " + - "aggregate = " + aggregate); - } + LOG.trace("clusterMetric = " + clusterMetric + ", " + + "aggregate = " + aggregate); rowCount++; stmt.clearParameters(); @@ -685,7 +608,8 @@ public class PhoenixHBaseAccessor { stmt.setLong(4, clusterMetric.getTimestamp()); stmt.setString(5, clusterMetric.getType()); stmt.setDouble(6, aggregate.getSum()); - stmt.setInt(7, aggregate.getNumberOfHosts()); +// stmt.setInt(7, aggregate.getNumberOfHosts()); + stmt.setLong(7, aggregate.getNumberOfSamples()); stmt.setDouble(8, aggregate.getMax()); stmt.setDouble(9, aggregate.getMin()); @@ -727,68 +651,48 @@ public class PhoenixHBaseAccessor { } } - /** - * Save Metric aggregate records. + * Get metrics aggregated across hosts. * + * @param condition @Condition + * @return @TimelineMetrics * @throws SQLException */ - public void saveClusterAggregateHourlyRecords( - Map<TimelineClusterMetric, MetricHostAggregate> records, - String tableName) + public TimelineMetrics getAggregateMetricRecords(final Condition condition) throws SQLException { - if (records == null || records.isEmpty()) { - LOG.debug("Empty aggregate records."); - return; - } - long start = System.currentTimeMillis(); + if (condition.isEmpty()) { + throw new SQLException("No filter criteria specified."); + } Connection conn = getConnection(); PreparedStatement stmt = null; - try { - stmt = conn.prepareStatement(String.format - (UPSERT_CLUSTER_AGGREGATE_TIME_SQL, tableName)); - int rowCount = 0; - - for (Map.Entry<TimelineClusterMetric, MetricHostAggregate> - aggregateEntry : records.entrySet()) { - TimelineClusterMetric clusterMetric = aggregateEntry.getKey(); - MetricHostAggregate aggregate = aggregateEntry.getValue(); - - if (LOG.isTraceEnabled()) { - LOG.trace("clusterMetric = " + clusterMetric + ", " + - "aggregate = " + aggregate); - } + TimelineMetrics metrics = new TimelineMetrics(); - rowCount++; - stmt.clearParameters(); - stmt.setString(1, clusterMetric.getMetricName()); - stmt.setString(2, clusterMetric.getAppId()); - stmt.setString(3, clusterMetric.getInstanceId()); - stmt.setLong(4, clusterMetric.getTimestamp()); - stmt.setString(5, clusterMetric.getType()); - stmt.setDouble(6, aggregate.getSum()); -// stmt.setInt(7, aggregate.getNumberOfHosts()); - stmt.setLong(7, aggregate.getNumberOfSamples()); - stmt.setDouble(8, aggregate.getMax()); - stmt.setDouble(9, aggregate.getMin()); + try { + stmt = PhoenixTransactSQL.prepareGetAggregateSqlStmt(conn, condition); - try { - stmt.executeUpdate(); - } catch (SQLException sql) { - // we have no way to verify it works!!! - LOG.error(sql); - } + ResultSet rs = stmt.executeQuery(); - if (rowCount >= PHOENIX_MAX_MUTATION_STATE_SIZE - 1) { - conn.commit(); - rowCount = 0; + while (rs.next()) { + TimelineMetric metric = new TimelineMetric(); + metric.setMetricName(rs.getString("METRIC_NAME")); + metric.setAppId(rs.getString("APP_ID")); + metric.setInstanceId(rs.getString("INSTANCE_ID")); + metric.setTimestamp(rs.getLong("SERVER_TIME")); + metric.setStartTime(rs.getLong("SERVER_TIME")); + Map<Long, Double> valueMap = new HashMap<Long, Double>(); + valueMap.put(rs.getLong("SERVER_TIME"), + rs.getDouble("METRIC_SUM") / rs.getInt("HOSTS_COUNT")); + metric.setMetricValues(valueMap); + + if (condition.isGrouped()) { + metrics.addOrMergeTimelineMetric(metric); + } else { + metrics.getMetrics().add(metric); } } - conn.commit(); - } finally { if (stmt != null) { try { @@ -805,10 +709,7 @@ public class PhoenixHBaseAccessor { } } } - long end = System.currentTimeMillis(); - if ((end - start) > 60000l) { - LOG.info("Time to save: " + (end - start) + ", " + - "thread = " + Thread.currentThread().getName()); - } + LOG.info("Aggregate records size: " + metrics.getMetrics().size()); + return metrics; } } http://git-wip-us.apache.org/repos/asf/ambari/blob/102b4773/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixTransactSQL.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixTransactSQL.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixTransactSQL.java index 421d533..fb02dc7 100644 --- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixTransactSQL.java +++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixTransactSQL.java @@ -17,13 +17,11 @@ */ package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline; -import com.sun.xml.bind.v2.util.QNameMap; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import java.sql.Connection; import java.sql.PreparedStatement; import java.sql.SQLException; -import java.util.Collections; import java.util.LinkedHashSet; import java.util.List; import java.util.Set; @@ -34,6 +32,7 @@ import java.util.Set; public class PhoenixTransactSQL { static final Log LOG = LogFactory.getLog(PhoenixTransactSQL.class); + // TODO: Configurable TTL values /** * Create table to store individual metric records. */ @@ -207,10 +206,8 @@ public class PhoenixTransactSQL { public static final String DEFAULT_ENCODING = "FAST_DIFF"; public static final long NATIVE_TIME_RANGE_DELTA = 120000; // 2 minutes - /** - * Filter to optimize HBase scan by using file timestamps. This prevents + /** Filter to optimize HBase scan by using file timestamps. This prevents * a full table scan of metric records. - * * @return Phoenix Hint String */ public static String getNaiveTimeRangeHint(Long startTime, Long delta) { @@ -246,47 +243,33 @@ public class PhoenixTransactSQL { sb.append(" LIMIT ").append(condition.getLimit()); } - if (LOG.isDebugEnabled()) { - LOG.debug("SQL: " + sb.toString() + ", condition: " + condition); - } + LOG.debug("SQL: " + sb.toString() + ", condition: " + condition); PreparedStatement stmt = connection.prepareStatement(sb.toString()); int pos = 1; if (condition.getMetricNames() != null) { for (; pos <= condition.getMetricNames().size(); pos++) { - if (LOG.isDebugEnabled()) { - LOG.debug("Setting pos: " + pos + ", value = " + condition.getMetricNames().get(pos - 1)); - } + LOG.debug("Setting pos: " + pos + ", value = " + condition.getMetricNames().get(pos - 1)); stmt.setString(pos, condition.getMetricNames().get(pos - 1)); } } if (condition.getHostname() != null) { - if (LOG.isDebugEnabled()) { - LOG.debug("Setting pos: " + pos + ", value: " + condition.getHostname()); - } + LOG.debug("Setting pos: " + pos + ", value: " + condition.getHostname()); stmt.setString(pos++, condition.getHostname()); } if (condition.getAppId() != null) { - if (LOG.isDebugEnabled()) { - LOG.debug("Setting pos: " + pos + ", value: " + condition.getAppId()); - } + LOG.debug("Setting pos: " + pos + ", value: " + condition.getAppId()); stmt.setString(pos++, condition.getAppId()); } if (condition.getInstanceId() != null) { - if (LOG.isDebugEnabled()) { - LOG.debug("Setting pos: " + pos + ", value: " + condition.getInstanceId()); - } + LOG.debug("Setting pos: " + pos + ", value: " + condition.getInstanceId()); stmt.setString(pos++, condition.getInstanceId()); } if (condition.getStartTime() != null) { - if (LOG.isDebugEnabled()) { - LOG.debug("Setting pos: " + pos + ", value: " + condition.getStartTime()); - } + LOG.debug("Setting pos: " + pos + ", value: " + condition.getStartTime()); stmt.setLong(pos++, condition.getStartTime()); } if (condition.getEndTime() != null) { - if (LOG.isDebugEnabled()) { - LOG.debug("Setting pos: " + pos + ", value: " + condition.getEndTime()); - } + LOG.debug("Setting pos: " + pos + ", value: " + condition.getEndTime()); stmt.setLong(pos, condition.getEndTime()); } if (condition.getFetchSize() != null) { @@ -297,80 +280,6 @@ public class PhoenixTransactSQL { } - public static PreparedStatement prepareGetLatestMetricSqlStmt( - Connection connection, Condition condition) throws SQLException { - - if (condition.isEmpty()) { - throw new IllegalArgumentException("Condition is empty."); - } - - if (condition.getMetricNames() == null - || condition.getMetricNames().size() == 0) { - throw new IllegalArgumentException("Point in time query without " + - "metric names not supported "); - } - - String stmtStr; - if (condition.getStatement() != null) { - stmtStr = condition.getStatement(); - } else { - stmtStr = String.format(GET_METRIC_SQL, - "", - METRICS_RECORD_TABLE_NAME); - } - - StringBuilder sb = new StringBuilder(stmtStr); - sb.append(" WHERE "); - sb.append(condition.getConditionClause()); - String orderByClause = condition.getOrderByClause(); - if (orderByClause != null) { - sb.append(orderByClause); - } else { - sb.append(" ORDER BY SERVER_TIME DESC, METRIC_NAME "); - } - - sb.append(" LIMIT ").append(condition.getMetricNames().size()); - - if (LOG.isDebugEnabled()) { - LOG.debug("SQL: " + sb.toString() + ", condition: " + condition); - } - PreparedStatement stmt = connection.prepareStatement(sb.toString()); - int pos = 1; - if (condition.getMetricNames() != null) { - //IGNORE condition limit, set one based on number of metric names - for (; pos <= condition.getMetricNames().size(); pos++) { - if (LOG.isDebugEnabled()) { - LOG.debug("Setting pos: " + pos + ", value = " + condition.getMetricNames().get(pos - 1)); - } - stmt.setString(pos, condition.getMetricNames().get(pos - 1)); - } - } - if (condition.getHostname() != null) { - if (LOG.isDebugEnabled()) { - LOG.debug("Setting pos: " + pos + ", value: " + condition.getHostname()); - } - stmt.setString(pos++, condition.getHostname()); - } - if (condition.getAppId() != null) { - if (LOG.isDebugEnabled()) { - LOG.debug("Setting pos: " + pos + ", value: " + condition.getAppId()); - } - stmt.setString(pos++, condition.getAppId()); - } - if (condition.getInstanceId() != null) { - if (LOG.isDebugEnabled()) { - LOG.debug("Setting pos: " + pos + ", value: " + condition.getInstanceId()); - } - stmt.setString(pos++, condition.getInstanceId()); - } - - if (condition.getFetchSize() != null) { - stmt.setFetchSize(condition.getFetchSize()); - } - - return stmt; - } - public static PreparedStatement prepareGetAggregateSqlStmt( Connection connection, Condition condition) throws SQLException { @@ -389,9 +298,7 @@ public class PhoenixTransactSQL { String query = String.format(sb.toString(), PhoenixTransactSQL.getNaiveTimeRangeHint(condition.getStartTime(), NATIVE_TIME_RANGE_DELTA)); - if (LOG.isDebugEnabled()) { - LOG.debug("SQL => " + query + ", condition => " + condition); - } + LOG.debug("SQL => " + query + ", condition => " + condition); PreparedStatement stmt = connection.prepareStatement(query); int pos = 1; if (condition.getMetricNames() != null) { @@ -416,87 +323,7 @@ public class PhoenixTransactSQL { return stmt; } - public static PreparedStatement prepareGetLatestAggregateMetricSqlStmt( - Connection connection, Condition condition) throws SQLException { - - if (condition.isEmpty()) { - throw new IllegalArgumentException("Condition is empty."); - } - - if (condition.getMetricNames() == null - || condition.getMetricNames().size() == 0) { - throw new IllegalArgumentException("Point in time query without " + - "metric names not supported "); - } - - String stmtStr; - if (condition.getStatement() != null) { - stmtStr = condition.getStatement(); - } else { - stmtStr = String.format(GET_CLUSTER_AGGREGATE_SQL, ""); - } - - StringBuilder sb = new StringBuilder(stmtStr); - sb.append(" WHERE "); - sb.append(condition.getConditionClause()); - String orderByClause = condition.getOrderByClause(); - if (orderByClause != null) { - sb.append(orderByClause); - } else { - sb.append(" ORDER BY SERVER_TIME DESC, METRIC_NAME "); - } - - sb.append(" LIMIT ").append(condition.getMetricNames().size()); - - String query = sb.toString(); - if (LOG.isDebugEnabled()) { - LOG.debug("SQL: " + query + ", condition: " + condition); - } - - PreparedStatement stmt = connection.prepareStatement(query); - int pos = 1; - if (condition.getMetricNames() != null) { - for (; pos <= condition.getMetricNames().size(); pos++) { - stmt.setString(pos, condition.getMetricNames().get(pos - 1)); - } - } - if (condition.getAppId() != null) { - if (LOG.isDebugEnabled()) { - LOG.debug("Setting pos: " + pos + ", value: " + condition.getAppId()); - } - stmt.setString(pos++, condition.getAppId()); - } - if (condition.getInstanceId() != null) { - stmt.setString(pos++, condition.getInstanceId()); - } - - return stmt; - } - - static interface Condition { - - boolean isEmpty(); - - List<String> getMetricNames(); - boolean isPointInTime(); - boolean isGrouped(); - void setStatement(String statement); - String getHostname(); - String getAppId(); - String getInstanceId(); - String getConditionClause(); - String getOrderByClause(); - String getStatement(); - Long getStartTime(); - Long getEndTime(); - Integer getLimit(); - Integer getFetchSize(); - void setFetchSize(Integer fetchSize); - void addOrderByColumn(String column); - void setNoLimit(); - } - - static class DefaultCondition implements Condition { + static class Condition { List<String> metricNames; String hostname; String appId; @@ -510,7 +337,7 @@ public class PhoenixTransactSQL { String statement; Set<String> orderByColumns = new LinkedHashSet<String>(); - DefaultCondition(List<String> metricNames, String hostname, String appId, + Condition(List<String> metricNames, String hostname, String appId, String instanceId, Long startTime, Long endTime, Integer limit, boolean grouped) { this.metricNames = metricNames; @@ -523,22 +350,22 @@ public class PhoenixTransactSQL { this.grouped = grouped; } - public String getStatement() { + String getStatement() { return statement; } - public void setStatement(String statement) { + void setStatement(String statement) { this.statement = statement; } - public List<String> getMetricNames() { + List<String> getMetricNames() { return metricNames == null || metricNames.isEmpty() ? null : metricNames; } String getMetricsClause() { StringBuilder sb = new StringBuilder("("); if (metricNames != null) { - for (String name : getMetricNames()) { + for (String name : metricNames) { if (sb.length() != 1) { sb.append(", "); } @@ -551,48 +378,61 @@ public class PhoenixTransactSQL { } } - public String getConditionClause() { + String getConditionClause() { StringBuilder sb = new StringBuilder(); boolean appendConjunction = false; if (getMetricNames() != null) { - if (appendConjunction) { - sb.append(" AND"); - } - sb.append("METRIC_NAME IN "); sb.append(getMetricsClause()); appendConjunction = true; } - - appendConjunction = append(sb, appendConjunction, getHostname(), " HOSTNAME = ?"); - appendConjunction = append(sb, appendConjunction, getAppId(), " APP_ID = ?"); - appendConjunction = append(sb, appendConjunction, getInstanceId(), " INSTANCE_ID = ?"); - appendConjunction = append(sb, appendConjunction, getStartTime(), " SERVER_TIME >= ?"); - append(sb, appendConjunction, getEndTime(), " SERVER_TIME < ?"); - - return sb.toString(); - } - - protected static boolean append(StringBuilder sb, - boolean appendConjunction, - Object value, String str) { - if (value != null) { - if (appendConjunction) { - sb.append(" AND"); - } - - sb.append(str); + if (appendConjunction) { + sb.append(" AND"); + } + appendConjunction = false; + if (getHostname() != null) { + sb.append(" HOSTNAME = ?"); appendConjunction = true; } - return appendConjunction; + if (appendConjunction) { + sb.append(" AND"); + } + appendConjunction = false; + if (getAppId() != null) { + sb.append(" APP_ID = ?"); + appendConjunction = true; + } + if (appendConjunction) { + sb.append(" AND"); + } + appendConjunction = false; + if (getInstanceId() != null) { + sb.append(" INSTANCE_ID = ?"); + appendConjunction = true; + } + if (appendConjunction) { + sb.append(" AND"); + } + appendConjunction = false; + if (getStartTime() != null) { + sb.append(" SERVER_TIME >= ?"); + appendConjunction = true; + } + if (appendConjunction) { + sb.append(" AND"); + } + if (getEndTime() != null) { + sb.append(" SERVER_TIME < ?"); + } + return sb.toString(); } - public String getHostname() { + String getHostname() { return hostname == null || hostname.isEmpty() ? null : hostname; } - public String getAppId() { + String getAppId() { if (appId != null && !appId.isEmpty()) { if (!appId.equals("HOST")) { return appId.toLowerCase(); @@ -603,27 +443,22 @@ public class PhoenixTransactSQL { return null; } - public String getInstanceId() { + String getInstanceId() { return instanceId == null || instanceId.isEmpty() ? null : instanceId; } /** * Convert to millis. */ - public Long getStartTime() { - if (startTime == null) { - return null; - } else if (startTime < 9999999999l) { + Long getStartTime() { + if (startTime < 9999999999l) { return startTime * 1000; } else { return startTime; } } - public Long getEndTime() { - if (endTime == null) { - return null; - } + Long getEndTime() { if (endTime < 9999999999l) { return endTime * 1000; } else { @@ -631,26 +466,22 @@ public class PhoenixTransactSQL { } } - public void setNoLimit() { + void setNoLimit() { this.noLimit = true; } - public Integer getLimit() { + Integer getLimit() { if (noLimit) { return null; } return limit == null ? PhoenixHBaseAccessor.RESULTSET_LIMIT : limit; } - public boolean isGrouped() { + boolean isGrouped() { return grouped; } - public boolean isPointInTime() { - return getStartTime() == null && getEndTime() == null; - } - - public boolean isEmpty() { + boolean isEmpty() { return (metricNames == null || metricNames.isEmpty()) && (hostname == null || hostname.isEmpty()) && (appId == null || appId.isEmpty()) @@ -659,19 +490,19 @@ public class PhoenixTransactSQL { && endTime == null; } - public Integer getFetchSize() { + Integer getFetchSize() { return fetchSize; } - public void setFetchSize(Integer fetchSize) { + void setFetchSize(Integer fetchSize) { this.fetchSize = fetchSize; } - public void addOrderByColumn(String column) { + void addOrderByColumn(String column) { orderByColumns.add(column); } - public String getOrderByClause() { + String getOrderByClause() { String orderByStr = " ORDER BY "; if (!orderByColumns.isEmpty()) { StringBuilder sb = new StringBuilder(orderByStr); @@ -704,172 +535,70 @@ public class PhoenixTransactSQL { } } - static class LikeCondition extends DefaultCondition { + static class LikeCondition extends Condition { LikeCondition(List<String> metricNames, String hostname, - String appId, String instanceId, Long startTime, - Long endTime, Integer limit, boolean grouped) { + String appId, String instanceId, Long startTime, + Long endTime, Integer limit, boolean grouped) { super(metricNames, hostname, appId, instanceId, startTime, endTime, - limit, grouped); + limit, grouped); } @Override - public String getConditionClause() { + String getConditionClause() { StringBuilder sb = new StringBuilder(); boolean appendConjunction = false; if (getMetricNames() != null) { sb.append("("); - for (String name : getMetricNames()) { + for (String name : metricNames) { if (sb.length() > 1) { sb.append(" OR "); } sb.append("METRIC_NAME LIKE ?"); } - sb.append(")"); appendConjunction = true; } - - appendConjunction = append(sb, appendConjunction, getHostname(), " HOSTNAME = ?"); - appendConjunction = append(sb, appendConjunction, getAppId(), " APP_ID = ?"); - appendConjunction = append(sb, appendConjunction, getInstanceId(), " INSTANCE_ID = ?"); - appendConjunction = append(sb, appendConjunction, getStartTime(), " SERVER_TIME >= ?"); - append(sb, appendConjunction, getEndTime(), " SERVER_TIME < ?"); - - return sb.toString(); - } - } - - static class SplitByMetricNamesCondition implements Condition { - private final Condition adaptee; - private String currentMetric; - - SplitByMetricNamesCondition(Condition condition){ - this.adaptee = condition; - } - - @Override - public boolean isEmpty() { - return adaptee.isEmpty(); - } - - @Override - public List<String> getMetricNames() { - return Collections.singletonList(currentMetric); - } - - @Override - public boolean isPointInTime() { - return adaptee.isPointInTime(); - } - - @Override - public boolean isGrouped() { - return adaptee.isGrouped(); - } - - @Override - public void setStatement(String statement) { - adaptee.setStatement(statement); - } - - @Override - public String getHostname() { - return adaptee.getHostname(); - } - - @Override - public String getAppId() { - return adaptee.getAppId(); - } - - @Override - public String getInstanceId() { - return adaptee.getInstanceId(); - } - - @Override - public String getConditionClause() { - StringBuilder sb = new StringBuilder(); - boolean appendConjunction = false; - - if (getMetricNames() != null) { - for (String name : getMetricNames()) { - if (sb.length() > 1) { - sb.append(" OR "); - } - sb.append("METRIC_NAME = ?"); - } - + if (appendConjunction) { + sb.append(" AND"); + } + appendConjunction = false; + if (getHostname() != null) { + sb.append(" HOSTNAME = ?"); appendConjunction = true; } - - appendConjunction = DefaultCondition.append(sb, appendConjunction, - getHostname(), " HOSTNAME = ?"); - appendConjunction = DefaultCondition.append(sb, appendConjunction, - getAppId(), " APP_ID = ?"); - appendConjunction = DefaultCondition.append(sb, appendConjunction, - getInstanceId(), " INSTANCE_ID = ?"); - appendConjunction = DefaultCondition.append(sb, appendConjunction, - getStartTime(), " SERVER_TIME >= ?"); - DefaultCondition.append(sb, appendConjunction, getEndTime(), - " SERVER_TIME < ?"); - + if (appendConjunction) { + sb.append(" AND"); + } + appendConjunction = false; + if (getAppId() != null) { + sb.append(" APP_ID = ?"); + appendConjunction = true; + } + if (appendConjunction) { + sb.append(" AND"); + } + appendConjunction = false; + if (getInstanceId() != null) { + sb.append(" INSTANCE_ID = ?"); + appendConjunction = true; + } + if (appendConjunction) { + sb.append(" AND"); + } + appendConjunction = false; + if (getStartTime() != null) { + sb.append(" SERVER_TIME >= ?"); + appendConjunction = true; + } + if (appendConjunction) { + sb.append(" AND"); + } + if (getEndTime() != null) { + sb.append(" SERVER_TIME < ?"); + } return sb.toString(); } - - @Override - public String getOrderByClause() { - return adaptee.getOrderByClause(); - } - - @Override - public String getStatement() { - return adaptee.getStatement(); - } - - @Override - public Long getStartTime() { - return adaptee.getStartTime(); - } - - @Override - public Long getEndTime() { - return adaptee.getEndTime(); - } - - @Override - public Integer getLimit() { - return adaptee.getLimit(); - } - - @Override - public Integer getFetchSize() { - return adaptee.getFetchSize(); - } - - @Override - public void setFetchSize(Integer fetchSize) { - adaptee.setFetchSize(fetchSize); - } - - @Override - public void addOrderByColumn(String column) { - adaptee.addOrderByColumn(column); - } - - @Override - public void setNoLimit() { - adaptee.setNoLimit(); - } - - public List<String> getOriginalMetricNames() { - return adaptee.getMetricNames(); - } - - public void setCurrentMetric(String currentMetric) { - this.currentMetric = currentMetric; - } } } http://git-wip-us.apache.org/repos/asf/ambari/blob/102b4773/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricAggregator.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricAggregator.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricAggregator.java index f4f895f..cab154b 100644 --- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricAggregator.java +++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricAggregator.java @@ -27,7 +27,6 @@ import java.sql.SQLException; import java.util.HashMap; import java.util.Map; import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.Condition; -import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.DefaultCondition; import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.GET_METRIC_AGGREGATE_ONLY_SQL; public class TimelineMetricAggregator extends AbstractTimelineAggregator { @@ -79,7 +78,7 @@ public class TimelineMetricAggregator extends AbstractTimelineAggregator { @Override protected Condition prepareMetricQueryCondition(long startTime, long endTime) { - Condition condition = new DefaultCondition(null, null, null, null, startTime, + Condition condition = new Condition(null, null, null, null, startTime, endTime, null, true); condition.setNoLimit(); condition.setFetchSize(resultsetFetchSize); http://git-wip-us.apache.org/repos/asf/ambari/blob/102b4773/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricClusterAggregator.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricClusterAggregator.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricClusterAggregator.java index e291f36..654c188 100644 --- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricClusterAggregator.java +++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricClusterAggregator.java @@ -32,7 +32,6 @@ import java.util.List; import java.util.Map; import static java.util.concurrent.TimeUnit.SECONDS; import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.Condition; -import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.DefaultCondition; import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.GET_METRIC_SQL; import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.METRICS_RECORD_TABLE_NAME; import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.NATIVE_TIME_RANGE_DELTA; @@ -92,7 +91,7 @@ public class TimelineMetricClusterAggregator extends AbstractTimelineAggregator @Override protected Condition prepareMetricQueryCondition(long startTime, long endTime) { - Condition condition = new DefaultCondition(null, null, null, null, startTime, + Condition condition = new Condition(null, null, null, null, startTime, endTime, null, true); condition.setNoLimit(); condition.setFetchSize(resultsetFetchSize); http://git-wip-us.apache.org/repos/asf/ambari/blob/102b4773/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricClusterAggregatorHourly.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricClusterAggregatorHourly.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricClusterAggregatorHourly.java index 1d5c5a4..7764ea3 100644 --- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricClusterAggregatorHourly.java +++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricClusterAggregatorHourly.java @@ -31,7 +31,6 @@ import static java.util.concurrent.TimeUnit.SECONDS; import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixHBaseAccessor.getMetricClusterAggregateFromResultSet; import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixHBaseAccessor.getTimelineMetricClusterKeyFromResultSet; import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.Condition; -import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.DefaultCondition; import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.GET_CLUSTER_AGGREGATE_SQL; import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.METRICS_CLUSTER_AGGREGATE_HOURLY_TABLE_NAME; import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.CLUSTER_AGGREGATOR_HOUR_CHECKPOINT_CUTOFF_INTERVAL; @@ -90,7 +89,7 @@ public class TimelineMetricClusterAggregatorHourly extends @Override protected Condition prepareMetricQueryCondition(long startTime, long endTime) { - Condition condition = new DefaultCondition(null, null, null, null, startTime, + Condition condition = new Condition(null, null, null, null, startTime, endTime, null, true); condition.setNoLimit(); condition.setFetchSize(resultsetFetchSize); http://git-wip-us.apache.org/repos/asf/ambari/blob/102b4773/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/ITClusterAggregator.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/ITClusterAggregator.java b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/ITClusterAggregator.java index 2da9c82..f7e53f5 100644 --- a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/ITClusterAggregator.java +++ b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/ITClusterAggregator.java @@ -38,7 +38,6 @@ import java.util.Map; import static junit.framework.Assert.assertEquals; import static junit.framework.Assert.fail; import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.Condition; -import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.DefaultCondition; import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.GET_CLUSTER_AGGREGATE_SQL; import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.LOG; import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.NATIVE_TIME_RANGE_DELTA; @@ -96,7 +95,7 @@ public class ITClusterAggregator extends AbstractMiniHBaseClusterTest { boolean success = agg.doWork(startTime, endTime); //THEN - Condition condition = new DefaultCondition(null, null, null, null, startTime, + Condition condition = new Condition(null, null, null, null, startTime, endTime, null, true); condition.setStatement(String.format(GET_CLUSTER_AGGREGATE_SQL, PhoenixTransactSQL.getNaiveTimeRangeHint(startTime, NATIVE_TIME_RANGE_DELTA))); @@ -156,7 +155,7 @@ public class ITClusterAggregator extends AbstractMiniHBaseClusterTest { boolean success = agg.doWork(startTime, endTime); //THEN - Condition condition = new DefaultCondition(null, null, null, null, startTime, + Condition condition = new Condition(null, null, null, null, startTime, endTime, null, true); condition.setStatement(String.format(GET_CLUSTER_AGGREGATE_SQL, PhoenixTransactSQL.getNaiveTimeRangeHint(startTime, NATIVE_TIME_RANGE_DELTA))); http://git-wip-us.apache.org/repos/asf/ambari/blob/102b4773/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/ITMetricAggregator.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/ITMetricAggregator.java b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/ITMetricAggregator.java index 22e1363..d166a22 100644 --- a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/ITMetricAggregator.java +++ b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/ITMetricAggregator.java @@ -39,7 +39,6 @@ import static junit.framework.Assert.assertEquals; import static junit.framework.Assert.assertTrue; import static junit.framework.Assert.fail; import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.Condition; -import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.DefaultCondition; import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.GET_METRIC_AGGREGATE_ONLY_SQL; import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.LOG; import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.METRICS_AGGREGATE_HOURLY_TABLE_NAME; @@ -85,7 +84,7 @@ public class ITMetricAggregator extends AbstractMiniHBaseClusterTest { TimelineMetrics metricsSent = prepareTimelineMetrics(startTime, "local"); hdb.insertMetricRecords(metricsSent); - Condition queryCondition = new DefaultCondition(null, "local", null, null, + Condition queryCondition = new Condition(null, "local", null, null, startTime, startTime + (15 * 60 * 1000), null, false); TimelineMetrics recordRead = hdb.getMetricRecords(queryCondition); @@ -121,7 +120,7 @@ public class ITMetricAggregator extends AbstractMiniHBaseClusterTest { boolean success = aggregatorMinute.doWork(startTime, endTime); //THEN - Condition condition = new DefaultCondition(null, null, null, null, startTime, + Condition condition = new Condition(null, null, null, null, startTime, endTime, null, true); condition.setStatement(String.format(GET_METRIC_AGGREGATE_ONLY_SQL, PhoenixTransactSQL.getNaiveTimeRangeHint(startTime, NATIVE_TIME_RANGE_DELTA), @@ -200,7 +199,7 @@ public class ITMetricAggregator extends AbstractMiniHBaseClusterTest { assertTrue(success); //THEN - Condition condition = new DefaultCondition(null, null, null, null, startTime, + Condition condition = new Condition(null, null, null, null, startTime, endTime, null, true); condition.setStatement(String.format(GET_METRIC_AGGREGATE_ONLY_SQL, PhoenixTransactSQL.getNaiveTimeRangeHint(startTime, NATIVE_TIME_RANGE_DELTA), http://git-wip-us.apache.org/repos/asf/ambari/blob/102b4773/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TestPhoenixTransactSQL.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TestPhoenixTransactSQL.java b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TestPhoenixTransactSQL.java index 333b13b..1659e46 100644 --- a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TestPhoenixTransactSQL.java +++ b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TestPhoenixTransactSQL.java @@ -24,14 +24,12 @@ import java.util.Arrays; import java.util.Collections; import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.Condition; -import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.DefaultCondition; import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.LikeCondition; -import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.SplitByMetricNamesCondition; public class TestPhoenixTransactSQL { @Test public void testConditionClause() throws Exception { - Condition condition = new DefaultCondition( + Condition condition = new Condition( Arrays.asList("cpu_user", "mem_free"), "h1", "a1", "i1", 1407959718L, 1407959918L, null, false); @@ -44,23 +42,6 @@ public class TestPhoenixTransactSQL { } @Test - public void testSplitByMetricNamesCondition() throws Exception { - Condition c = new DefaultCondition( - Arrays.asList("cpu_user", "mem_free"), "h1", "a1", "i1", - 1407959718L, 1407959918L, null, false); - - SplitByMetricNamesCondition condition = new SplitByMetricNamesCondition(c); - condition.setCurrentMetric(c.getMetricNames().get(0)); - - String preparedClause = condition.getConditionClause(); - String expectedClause = "METRIC_NAME = ? AND HOSTNAME = ? AND " + - "APP_ID = ? AND INSTANCE_ID = ? AND SERVER_TIME >= ? AND SERVER_TIME < ?"; - - Assert.assertNotNull(preparedClause); - Assert.assertEquals(expectedClause, preparedClause); - } - - @Test public void testLikeConditionClause() throws Exception { Condition condition = new LikeCondition( Arrays.asList("cpu_user", "mem_free"), "h1", "a1", "i1", http://git-wip-us.apache.org/repos/asf/ambari/blob/102b4773/ambari-server/src/main/java/org/apache/ambari/server/controller/metrics/timeline/AMSPropertyProvider.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/controller/metrics/timeline/AMSPropertyProvider.java b/ambari-server/src/main/java/org/apache/ambari/server/controller/metrics/timeline/AMSPropertyProvider.java index e9aac45..585a28c 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/controller/metrics/timeline/AMSPropertyProvider.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/controller/metrics/timeline/AMSPropertyProvider.java @@ -137,8 +137,8 @@ public abstract class AMSPropertyProvider extends MetricsPropertyProvider { */ public Collection<Resource> populateResources() throws SystemException { // No open ended query support. - if (temporalInfo != null && (temporalInfo.getStartTime() == null - || temporalInfo.getEndTime() == null)) { + if (temporalInfo == null || temporalInfo.getStartTime() == null || + temporalInfo.getEndTime() == null) { return Collections.emptySet(); } @@ -163,9 +163,38 @@ public abstract class AMSPropertyProvider extends MetricsPropertyProvider { return Collections.emptySet(); } - String spec = getSpec(hostname, resource); + String metricsParam = getSetString(processRegexps(metrics.keySet()), -1); + // Reuse uriBuilder + uriBuilder.removeQuery(); + + if (metricsParam.length() > 0) { + uriBuilder.setParameter("metricNames", metricsParam); + } + + if (hostname != null && !hostname.isEmpty() && !hostname.equals(dummyHostName)) { + uriBuilder.setParameter("hostname", hostname); + } + + String componentName = getComponentName(resource); + if (componentName != null && !componentName.isEmpty()) { + if (TIMELINE_APPID_MAP.containsKey(componentName)) { + componentName = TIMELINE_APPID_MAP.get(componentName); + } + uriBuilder.setParameter("appId", componentName); + } + + long startTime = temporalInfo.getStartTime(); + if (startTime != -1) { + uriBuilder.setParameter("startTime", String.valueOf(startTime)); + } + + long endTime = temporalInfo.getEndTime(); + if (endTime != -1) { + uriBuilder.setParameter("endTime", String.valueOf(endTime)); + } BufferedReader reader = null; + String spec = uriBuilder.toString(); try { LOG.debug("Metrics request url =" + spec); reader = new BufferedReader(new InputStreamReader(streamProvider.readFrom(spec))); @@ -176,9 +205,8 @@ public abstract class AMSPropertyProvider extends MetricsPropertyProvider { Set<String> patterns = createPatterns(metrics.keySet()); for (TimelineMetric metric : timelineMetrics.getMetrics()) { - if (metric.getMetricName() != null - && metric.getMetricValues() != null - && checkMetricName(patterns, metric.getMetricName())) { + if (metric.getMetricName() != null && metric.getMetricValues() != null + && checkMetricName(patterns, metric.getMetricName())) { populateResource(resource, metric); } } @@ -202,42 +230,6 @@ public abstract class AMSPropertyProvider extends MetricsPropertyProvider { return Collections.emptySet(); } - private String getSpec(String hostname, Resource resource) { - String metricsParam = getSetString(processRegexps(metrics.keySet()), -1); - // Reuse uriBuilder - uriBuilder.removeQuery(); - - if (metricsParam.length() > 0) { - uriBuilder.setParameter("metricNames", metricsParam); - } - - if (hostname != null && !hostname.isEmpty() && !hostname.equals(dummyHostName)) { - uriBuilder.setParameter("hostname", hostname); - } - - String componentName = getComponentName(resource); - if (componentName != null && !componentName.isEmpty()) { - if (TIMELINE_APPID_MAP.containsKey(componentName)) { - componentName = TIMELINE_APPID_MAP.get(componentName); - } - uriBuilder.setParameter("appId", componentName); - } - - if (temporalInfo != null) { - long startTime = temporalInfo.getStartTime(); - if (startTime != -1) { - uriBuilder.setParameter("startTime", String.valueOf(startTime)); - } - - long endTime = temporalInfo.getEndTime(); - if (endTime != -1) { - uriBuilder.setParameter("endTime", String.valueOf(endTime)); - } - } - - return uriBuilder.toString(); - } - private Set<String> createPatterns(Set<String> rawNames) { Pattern pattern = Pattern.compile(METRIC_REGEXP_PATTERN); Set<String> result = new HashSet<String>(); http://git-wip-us.apache.org/repos/asf/ambari/blob/102b4773/ambari-server/src/test/java/org/apache/ambari/server/controller/metrics/timeline/AMSPropertyProviderTest.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/test/java/org/apache/ambari/server/controller/metrics/timeline/AMSPropertyProviderTest.java b/ambari-server/src/test/java/org/apache/ambari/server/controller/metrics/timeline/AMSPropertyProviderTest.java index 31df3e2..ae1e163 100644 --- a/ambari-server/src/test/java/org/apache/ambari/server/controller/metrics/timeline/AMSPropertyProviderTest.java +++ b/ambari-server/src/test/java/org/apache/ambari/server/controller/metrics/timeline/AMSPropertyProviderTest.java @@ -97,95 +97,6 @@ public class AMSPropertyProviderTest { } @Test - public void testPopulateResourcesForSingleHostMetricPointInTime() throws - Exception { - - // given - TestStreamProvider streamProvider = new TestStreamProvider(SINGLE_HOST_METRICS_FILE_PATH); - TestMetricHostProvider metricHostProvider = new TestMetricHostProvider(); - ComponentSSLConfiguration sslConfiguration = mock(ComponentSSLConfiguration.class); - Map<String, Map<String, PropertyInfo>> propertyIds = PropertyHelper.getMetricPropertyIds(Resource.Type.Host); - AMSPropertyProvider propertyProvider = new AMSHostPropertyProvider( - propertyIds, - streamProvider, - sslConfiguration, - metricHostProvider, - CLUSTER_NAME_PROPERTY_ID, - HOST_NAME_PROPERTY_ID - ); - - Resource resource = new ResourceImpl(Resource.Type.Host); - resource.setProperty(HOST_NAME_PROPERTY_ID, "h1"); - Map<String, TemporalInfo> temporalInfoMap = Collections.emptyMap(); - Request request = PropertyHelper.getReadRequest(Collections.singleton - (PROPERTY_ID1), temporalInfoMap); - System.out.println(request); - - // when - Set<Resource> resources = - propertyProvider.populateResources(Collections.singleton(resource), request, null); - - // then - Assert.assertEquals(1, resources.size()); - Resource res = resources.iterator().next(); - Map<String, Object> properties = PropertyHelper.getProperties(res); - Assert.assertNotNull(properties); - URIBuilder uriBuilder = AMSPropertyProvider.getAMSUriBuilder("localhost", 8188); - uriBuilder.addParameter("metricNames", "cpu_user"); - uriBuilder.addParameter("hostname", "h1"); - uriBuilder.addParameter("appId", "HOST"); - Assert.assertEquals(uriBuilder.toString(), streamProvider.getLastSpec()); - Double val = (Double) res.getPropertyValue(PROPERTY_ID1); - Assert.assertEquals(40.45, val, 0.001); - } - - @Test - public void testPopulateResourcesForMultipleHostMetricscPointInTime() throws Exception { - TestStreamProvider streamProvider = new TestStreamProvider(MULTIPLE_HOST_METRICS_FILE_PATH); - TestMetricHostProvider metricHostProvider = new TestMetricHostProvider(); - ComponentSSLConfiguration sslConfiguration = mock(ComponentSSLConfiguration.class); - - Map<String, Map<String, PropertyInfo>> propertyIds = PropertyHelper.getMetricPropertyIds(Resource.Type.Host); - AMSPropertyProvider propertyProvider = new AMSHostPropertyProvider( - propertyIds, - streamProvider, - sslConfiguration, - metricHostProvider, - CLUSTER_NAME_PROPERTY_ID, - HOST_NAME_PROPERTY_ID - ); - - Resource resource = new ResourceImpl(Resource.Type.Host); - resource.setProperty(HOST_NAME_PROPERTY_ID, "h1"); - Map<String, TemporalInfo> temporalInfoMap = Collections.emptyMap(); - Request request = PropertyHelper.getReadRequest( - new HashSet<String>() {{ add(PROPERTY_ID1); add(PROPERTY_ID2); }}, temporalInfoMap); - Set<Resource> resources = - propertyProvider.populateResources(Collections.singleton(resource), request, null); - Assert.assertEquals(1, resources.size()); - Resource res = resources.iterator().next(); - Map<String, Object> properties = PropertyHelper.getProperties(resources.iterator().next()); - Assert.assertNotNull(properties); - URIBuilder uriBuilder = AMSPropertyProvider.getAMSUriBuilder("localhost", 8188); - uriBuilder.addParameter("metricNames", "cpu_user,mem_free"); - uriBuilder.addParameter("hostname", "h1"); - uriBuilder.addParameter("appId", "HOST"); - - URIBuilder uriBuilder2 = AMSPropertyProvider.getAMSUriBuilder("localhost", 8188); - uriBuilder2.addParameter("metricNames", "mem_free,cpu_user"); - uriBuilder2.addParameter("hostname", "h1"); - uriBuilder2.addParameter("appId", "HOST"); - System.out.println(streamProvider.getLastSpec()); - Assert.assertTrue(uriBuilder.toString().equals(streamProvider.getLastSpec()) - || uriBuilder2.toString().equals(streamProvider.getLastSpec())); - Double val1 = (Double) res.getPropertyValue(PROPERTY_ID1); - Assert.assertEquals(40.45, val1, 0.001); - Double val2 = (Double)res.getPropertyValue(PROPERTY_ID2); - Assert.assertEquals(2.47025664E8, val2, 0.1); - } - - - @Test public void testPopulateResourcesForMultipleHostMetrics() throws Exception { TestStreamProvider streamProvider = new TestStreamProvider(MULTIPLE_HOST_METRICS_FILE_PATH); TestMetricHostProvider metricHostProvider = new TestMetricHostProvider(); @@ -228,14 +139,13 @@ public class AMSPropertyProviderTest { uriBuilder2.addParameter("startTime", "1416445244701"); uriBuilder2.addParameter("endTime", "1416445244901"); Assert.assertTrue(uriBuilder.toString().equals(streamProvider.getLastSpec()) - || uriBuilder2.toString().equals(streamProvider.getLastSpec())); + || uriBuilder2.toString().equals(streamProvider.getLastSpec())); Number[][] val = (Number[][]) res.getPropertyValue(PROPERTY_ID1); Assert.assertEquals(111, val.length); val = (Number[][]) res.getPropertyValue(PROPERTY_ID2); Assert.assertEquals(86, val.length); } - @Test public void testPopulateResourcesForRegexpMetrics() throws Exception { TestStreamProvider streamProvider = new TestStreamProvider(MULTIPLE_COMPONENT_REGEXP_METRICS_FILE_PATH);
