http://git-wip-us.apache.org/repos/asf/ambari/blob/42112e28/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/query/PhoenixTransactSQL.java
----------------------------------------------------------------------
diff --git 
a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/query/PhoenixTransactSQL.java
 
b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/query/PhoenixTransactSQL.java
new file mode 100644
index 0000000..4c4e673
--- /dev/null
+++ 
b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/query/PhoenixTransactSQL.java
@@ -0,0 +1,803 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ambari.metrics.core.timeline.query;
+
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+import 
org.apache.ambari.metrics.core.timeline.discovery.TimelineMetricMetadataManager;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.metrics2.sink.timeline.Precision;
+import 
org.apache.hadoop.metrics2.sink.timeline.PrecisionLimitExceededException;
+import org.apache.ambari.metrics.core.timeline.PhoenixHBaseAccessor;
+
+/**
+ * Encapsulate all metrics related SQL queries.
+ */
+public class PhoenixTransactSQL {
+
+  public static final Log LOG = LogFactory.getLog(PhoenixTransactSQL.class);
+
+  /**
+   * Create table to store individual metric records.
+   */
+  public static final String CREATE_METRICS_TABLE_SQL = "CREATE TABLE IF NOT " 
+
+    "EXISTS METRIC_RECORD (UUID BINARY(20) NOT NULL, " +
+    "SERVER_TIME BIGINT NOT NULL, " +
+    "METRIC_SUM DOUBLE, " +
+    "METRIC_COUNT UNSIGNED_INT, " +
+    "METRIC_MAX DOUBLE, " +
+    "METRIC_MIN DOUBLE, " +
+    "METRICS VARCHAR CONSTRAINT pk " +
+    "PRIMARY KEY (UUID, SERVER_TIME ROW_TIMESTAMP)) DATA_BLOCK_ENCODING='%s', 
IMMUTABLE_ROWS=true, " +
+    "TTL=%s, COMPRESSION='%s'";
+
+  public static final String CREATE_CONTAINER_METRICS_TABLE_SQL =
+      "CREATE TABLE IF NOT EXISTS CONTAINER_METRICS "
+      + "(APP_ID VARCHAR, "
+      + " CONTAINER_ID VARCHAR,"
+      + " START_TIME TIMESTAMP,"
+      + " FINISH_TIME TIMESTAMP, "
+      + " DURATION BIGINT,"
+      + " HOSTNAME VARCHAR,"
+      + " EXIT_CODE INTEGER,"
+      + " LOCALIZATION_DURATION BIGINT,"
+      + " LAUNCH_DURATION BIGINT,"
+      + " MEM_REQUESTED_GB DOUBLE,"
+      + " MEM_REQUESTED_GB_MILLIS DOUBLE,"
+      + " MEM_VIRTUAL_GB DOUBLE,"
+      + " MEM_USED_GB_MIN DOUBLE,"
+      + " MEM_USED_GB_MAX DOUBLE,"
+      + " MEM_USED_GB_AVG DOUBLE,"
+      + " MEM_USED_GB_50_PCT DOUBLE,"
+      + " MEM_USED_GB_75_PCT DOUBLE,"
+      + " MEM_USED_GB_90_PCT DOUBLE,"
+      + " MEM_USED_GB_95_PCT DOUBLE,"
+      + " MEM_USED_GB_99_PCT DOUBLE,"
+      + " MEM_UNUSED_GB DOUBLE,"
+      + " MEM_UNUSED_GB_MILLIS DOUBLE "
+      + " CONSTRAINT pk PRIMARY KEY(APP_ID, CONTAINER_ID)) 
DATA_BLOCK_ENCODING='%s',"
+      + " IMMUTABLE_ROWS=true, TTL=%s, COMPRESSION='%s'";
+
+  public static final String CREATE_METRICS_AGGREGATE_TABLE_SQL =
+    "CREATE TABLE IF NOT EXISTS %s " +
+      "(UUID BINARY(20) NOT NULL, " +
+      "SERVER_TIME BIGINT NOT NULL, " +
+      "METRIC_SUM DOUBLE," +
+      "METRIC_COUNT UNSIGNED_INT, " +
+      "METRIC_MAX DOUBLE," +
+      "METRIC_MIN DOUBLE CONSTRAINT pk " +
+      "PRIMARY KEY (UUID, SERVER_TIME ROW_TIMESTAMP)) 
DATA_BLOCK_ENCODING='%s', IMMUTABLE_ROWS=true, TTL=%s," +
+      " COMPRESSION='%s'";
+
+  public static final String CREATE_METRICS_CLUSTER_AGGREGATE_TABLE_SQL =
+    "CREATE TABLE IF NOT EXISTS %s " +
+      "(UUID BINARY(16) NOT NULL, " +
+      "SERVER_TIME BIGINT NOT NULL, " +
+      "METRIC_SUM DOUBLE, " +
+      "HOSTS_COUNT UNSIGNED_INT, " +
+      "METRIC_MAX DOUBLE, " +
+      "METRIC_MIN DOUBLE " +
+      "CONSTRAINT pk PRIMARY KEY (UUID, SERVER_TIME ROW_TIMESTAMP)) 
DATA_BLOCK_ENCODING='%s', IMMUTABLE_ROWS=true, " +
+      "TTL=%s, COMPRESSION='%s'";
+
+  // HOSTS_COUNT vs METRIC_COUNT
+  public static final String 
CREATE_METRICS_CLUSTER_AGGREGATE_GROUPED_TABLE_SQL =
+    "CREATE TABLE IF NOT EXISTS %s " +
+      "(UUID BINARY(16) NOT NULL, " +
+      "SERVER_TIME BIGINT NOT NULL, " +
+      "METRIC_SUM DOUBLE, " +
+      "METRIC_COUNT UNSIGNED_INT, " +
+      "METRIC_MAX DOUBLE, " +
+      "METRIC_MIN DOUBLE " +
+      "CONSTRAINT pk PRIMARY KEY (UUID, SERVER_TIME ROW_TIMESTAMP)) 
DATA_BLOCK_ENCODING='%s', IMMUTABLE_ROWS=true, " +
+      "TTL=%s, COMPRESSION='%s'";
+
+  public static final String CREATE_METRICS_METADATA_TABLE_SQL =
+    "CREATE TABLE IF NOT EXISTS METRICS_METADATA " +
+      "(METRIC_NAME VARCHAR, " +
+      "APP_ID VARCHAR, " +
+      "INSTANCE_ID VARCHAR, " +
+      "UUID BINARY(16), " +
+      "UNITS CHAR(20), " +
+      "TYPE CHAR(20), " +
+      "START_TIME UNSIGNED_LONG, " +
+      "SUPPORTS_AGGREGATION BOOLEAN, " +
+      "IS_WHITELISTED BOOLEAN " +
+      "CONSTRAINT pk PRIMARY KEY (METRIC_NAME, APP_ID)) " +
+      "DATA_BLOCK_ENCODING='%s', COMPRESSION='%s'";
+
+  public static final String CREATE_HOSTED_APPS_METADATA_TABLE_SQL =
+    "CREATE TABLE IF NOT EXISTS HOSTED_APPS_METADATA " +
+      "(HOSTNAME VARCHAR, UUID BINARY(4), APP_IDS VARCHAR, " +
+      "CONSTRAINT pk PRIMARY KEY (HOSTNAME))" +
+      "DATA_BLOCK_ENCODING='%s', COMPRESSION='%s'";
+
+  public static final String CREATE_INSTANCE_HOST_TABLE_SQL =
+    "CREATE TABLE IF NOT EXISTS INSTANCE_HOST_METADATA " +
+      "(INSTANCE_ID VARCHAR, HOSTNAME VARCHAR, " +
+      "CONSTRAINT pk PRIMARY KEY (INSTANCE_ID, HOSTNAME))" +
+      "DATA_BLOCK_ENCODING='%s', COMPRESSION='%s'";
+
+  public static final String ALTER_METRICS_METADATA_TABLE =
+    "ALTER TABLE METRICS_METADATA ADD IF NOT EXISTS IS_WHITELISTED BOOLEAN";
+
+  /**
+   * ALTER table to set new options
+   */
+  public static final String ALTER_SQL = "ALTER TABLE %s SET TTL=%s";
+
+  /**
+   * Insert into metric records table.
+   */
+  public static final String UPSERT_METRICS_SQL = "UPSERT INTO %s " +
+    "(UUID, " +
+    "SERVER_TIME, " +
+    "METRIC_SUM, " +
+    "METRIC_MAX, " +
+    "METRIC_MIN, " +
+    "METRIC_COUNT, " +
+    "METRICS) VALUES " +
+    "(?, ?, ?, ?, ?, ?, ?)";
+
+  public static final String UPSERT_CONTAINER_METRICS_SQL = "UPSERT INTO %s " +
+      "(APP_ID,"
+      + " CONTAINER_ID,"
+      + " START_TIME,"
+      + " FINISH_TIME,"
+      + " DURATION,"
+      + " HOSTNAME,"
+      + " EXIT_CODE,"
+      + " LOCALIZATION_DURATION,"
+      + " LAUNCH_DURATION,"
+      + " MEM_REQUESTED_GB,"
+      + " MEM_REQUESTED_GB_MILLIS,"
+      + " MEM_VIRTUAL_GB,"
+      + " MEM_USED_GB_MIN,"
+      + " MEM_USED_GB_MAX,"
+      + " MEM_USED_GB_AVG,"
+      + " MEM_USED_GB_50_PCT,"
+      + " MEM_USED_GB_75_PCT,"
+      + " MEM_USED_GB_90_PCT,"
+      + " MEM_USED_GB_95_PCT,"
+      + " MEM_USED_GB_99_PCT,"
+      + " MEM_UNUSED_GB,"
+      + " MEM_UNUSED_GB_MILLIS) VALUES " +
+      "(?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)";
+
+  public static final String UPSERT_CLUSTER_AGGREGATE_SQL = "UPSERT INTO " +
+    "%s (UUID, " +
+    "SERVER_TIME, " +
+    "METRIC_SUM, " +
+    "HOSTS_COUNT, " +
+    "METRIC_MAX, " +
+    "METRIC_MIN) " +
+    "VALUES (?, ?, ?, ?, ?, ?)";
+
+  public static final String UPSERT_CLUSTER_AGGREGATE_TIME_SQL = "UPSERT INTO" 
+
+    " %s (UUID, SERVER_TIME, " +
+    "METRIC_SUM, " +
+    "METRIC_COUNT, " +
+    "METRIC_MAX, " +
+    "METRIC_MIN) " +
+    "VALUES (?, ?, ?, ?, ?, ?)";
+
+  public static final String UPSERT_AGGREGATE_RECORD_SQL = "UPSERT INTO " +
+    "%s (UUID, " +
+    "SERVER_TIME, " +
+    "METRIC_SUM, " +
+    "METRIC_MAX, " +
+    "METRIC_MIN," +
+    "METRIC_COUNT) " +
+    "VALUES (?, ?, ?, ?, ?, ?)";
+
+  public static final String UPSERT_METADATA_SQL =
+    "UPSERT INTO METRICS_METADATA (METRIC_NAME, APP_ID, INSTANCE_ID, UUID, 
UNITS, TYPE, " +
+      "START_TIME, SUPPORTS_AGGREGATION, IS_WHITELISTED) " +
+      "VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)";
+
+  public static final String UPSERT_HOSTED_APPS_METADATA_SQL =
+    "UPSERT INTO HOSTED_APPS_METADATA (HOSTNAME, UUID, APP_IDS) VALUES (?, ?, 
?)";
+
+  public static final String UPSERT_INSTANCE_HOST_METADATA_SQL =
+    "UPSERT INTO INSTANCE_HOST_METADATA (INSTANCE_ID, HOSTNAME) VALUES (?, ?)";
+
+  /**
+   * Retrieve a set of rows from metrics records table.
+   */
+  public static final String GET_METRIC_SQL = "SELECT UUID, SERVER_TIME, " +
+    "METRIC_SUM, " +
+    "METRIC_MAX, " +
+    "METRIC_MIN, " +
+    "METRIC_COUNT, " +
+    "METRICS " +
+    "FROM %s";
+
+  /**
+   * Get latest metrics for a number of hosts
+   *
+   * Different queries for a number and a single hosts are used due to bug
+   * in Apache Phoenix
+   */
+  public static final String GET_LATEST_METRIC_SQL = "SELECT %s E.UUID AS 
UUID, " +
+    "E.SERVER_TIME AS SERVER_TIME, " +
+    "E.METRIC_SUM AS METRIC_SUM, " +
+    "E.METRIC_MAX AS METRIC_MAX, E.METRIC_MIN AS METRIC_MIN, " +
+    "E.METRIC_COUNT AS METRIC_COUNT, E.METRICS AS METRICS " +
+    "FROM %s AS E " +
+    "INNER JOIN " +
+    "(SELECT UUID, MAX(SERVER_TIME) AS MAX_SERVER_TIME " +
+    "FROM %s " +
+    "WHERE " +
+    "%s " +
+    "GROUP BY UUID) " +
+    "AS I " +
+    "ON E.UUID=I.UUID " +
+    "AND E.SERVER_TIME=I.MAX_SERVER_TIME";
+
+  public static final String GET_METRIC_AGGREGATE_ONLY_SQL = "SELECT UUID, " +
+    "SERVER_TIME, " +
+    "METRIC_SUM, " +
+    "METRIC_MAX, " +
+    "METRIC_MIN, " +
+    "METRIC_COUNT " +
+    "FROM %s";
+
+  public static final String GET_CLUSTER_AGGREGATE_SQL = "SELECT " +
+    "UUID, " +
+    "SERVER_TIME, " +
+    "METRIC_SUM, " +
+    "HOSTS_COUNT, " +
+    "METRIC_MAX, " +
+    "METRIC_MIN " +
+    "FROM %s";
+
+  public static final String GET_CLUSTER_AGGREGATE_TIME_SQL = "SELECT " +
+    "UUID, " +
+    "SERVER_TIME, " +
+    "METRIC_SUM, " +
+    "METRIC_COUNT, " +
+    "METRIC_MAX, " +
+    "METRIC_MIN " +
+    "FROM %s";
+
+  public static final String TOP_N_INNER_SQL = "SELECT UUID " +
+    "FROM %s WHERE %s GROUP BY UUID ORDER BY %s LIMIT %s";
+
+  public static final String GET_METRIC_METADATA_SQL = "SELECT " +
+    "METRIC_NAME, APP_ID, INSTANCE_ID, UUID, UNITS, TYPE, START_TIME, " +
+    "SUPPORTS_AGGREGATION, IS_WHITELISTED FROM METRICS_METADATA";
+
+  public static final String GET_HOSTED_APPS_METADATA_SQL = "SELECT " +
+    "HOSTNAME, UUID, APP_IDS FROM HOSTED_APPS_METADATA";
+
+  public static final String GET_INSTANCE_HOST_METADATA_SQL = "SELECT " +
+    "INSTANCE_ID, HOSTNAME FROM INSTANCE_HOST_METADATA";
+
+  /**
+   * Aggregate host metrics using a GROUP BY clause to take advantage of
+   * N - way parallel scan where N = number of regions.
+   */
+  public static final String GET_AGGREGATED_HOST_METRIC_GROUPBY_SQL = "UPSERT 
" +
+    "INTO %s (UUID, SERVER_TIME, METRIC_SUM, METRIC_COUNT, METRIC_MAX, 
METRIC_MIN) " +
+    "SELECT UUID, %s AS SERVER_TIME, " +
+    "SUM(METRIC_SUM), SUM(METRIC_COUNT), MAX(METRIC_MAX), MIN(METRIC_MIN) " +
+    "FROM %s WHERE%s SERVER_TIME > %s AND SERVER_TIME <= %s " +
+    "GROUP BY UUID";
+
+  /**
+   * Downsample host metrics.
+   */
+  public static final String DOWNSAMPLE_HOST_METRIC_SQL_UPSERT_PREFIX = 
"UPSERT INTO %s (UUID, SERVER_TIME, " +
+    "METRIC_SUM, METRIC_COUNT, METRIC_MAX, METRIC_MIN) ";
+
+  public static final String TOPN_DOWNSAMPLER_HOST_METRIC_SELECT_SQL = "SELECT 
UUID, " +
+    "%s AS SERVER_TIME, %s, 1, %s, %s FROM %s WHERE UUID IN %s AND SERVER_TIME 
> %s AND SERVER_TIME <= %s " +
+    "GROUP BY UUID ORDER BY %s DESC LIMIT %s";
+
+  /**
+   * Aggregate app metrics using a GROUP BY clause to take advantage of
+   * N - way parallel scan where N = number of regions.
+   */
+  public static final String GET_AGGREGATED_APP_METRIC_GROUPBY_SQL = "UPSERT " 
+
+         "INTO %s (UUID, SERVER_TIME, METRIC_SUM, METRIC_COUNT, METRIC_MAX, 
METRIC_MIN) SELECT UUID, %s AS SERVER_TIME, " +
+         "ROUND(AVG(METRIC_SUM),2), ROUND(AVG(%s)), MAX(METRIC_MAX), 
MIN(METRIC_MIN) FROM %s WHERE%s SERVER_TIME > %s AND " +
+         "SERVER_TIME <= %s GROUP BY UUID";
+
+  /**
+   * Downsample cluster metrics.
+   */
+  public static final String DOWNSAMPLE_CLUSTER_METRIC_SQL_UPSERT_PREFIX = 
"UPSERT INTO %s (UUID, SERVER_TIME, " +
+    "METRIC_SUM, METRIC_COUNT, METRIC_MAX, METRIC_MIN) ";
+
+  public static final String TOPN_DOWNSAMPLER_CLUSTER_METRIC_SELECT_SQL = 
"SELECT UUID, " +
+    "%s AS SERVER_TIME, %s, 1, %s, %s FROM %s WHERE UUID IN %s AND SERVER_TIME 
> %s AND SERVER_TIME <= %s " +
+    "GROUP BY UUID ORDER BY %s DESC LIMIT %s";
+
+  public static final String METRICS_RECORD_TABLE_NAME = "METRIC_RECORD";
+
+  public static final String CONTAINER_METRICS_TABLE_NAME = 
"CONTAINER_METRICS";
+
+  public static final String METRICS_AGGREGATE_MINUTE_TABLE_NAME =
+    "METRIC_RECORD_MINUTE";
+  public static final String METRICS_AGGREGATE_HOURLY_TABLE_NAME =
+    "METRIC_RECORD_HOURLY";
+  public static final String METRICS_AGGREGATE_DAILY_TABLE_NAME =
+    "METRIC_RECORD_DAILY";
+  public static final String METRICS_CLUSTER_AGGREGATE_TABLE_NAME =
+    "METRIC_AGGREGATE";
+  public static final String METRICS_CLUSTER_AGGREGATE_MINUTE_TABLE_NAME =
+    "METRIC_AGGREGATE_MINUTE";
+  public static final String METRICS_CLUSTER_AGGREGATE_HOURLY_TABLE_NAME =
+    "METRIC_AGGREGATE_HOURLY";
+  public static final String METRICS_CLUSTER_AGGREGATE_DAILY_TABLE_NAME =
+    "METRIC_AGGREGATE_DAILY";
+
+  public static final String PHOENIX_TABLES_REGEX_PATTERN_STRING = "METRIC_.*";
+
+  public static final String[] PHOENIX_TABLES = {
+    METRICS_RECORD_TABLE_NAME,
+    METRICS_AGGREGATE_MINUTE_TABLE_NAME,
+    METRICS_AGGREGATE_HOURLY_TABLE_NAME,
+    METRICS_AGGREGATE_DAILY_TABLE_NAME,
+    METRICS_CLUSTER_AGGREGATE_TABLE_NAME,
+    METRICS_CLUSTER_AGGREGATE_MINUTE_TABLE_NAME,
+    METRICS_CLUSTER_AGGREGATE_HOURLY_TABLE_NAME,
+    METRICS_CLUSTER_AGGREGATE_DAILY_TABLE_NAME
+  };
+
+  public static final String DEFAULT_TABLE_COMPRESSION = "SNAPPY";
+  public static final String DEFAULT_ENCODING = "FAST_DIFF";
+  public static final long HOUR = 3600000; // 1 hour
+  public static final long DAY = 86400000; // 1 day
+  private static boolean sortMergeJoinEnabled = false;
+
+  /**
+   * Filter to optimize HBase scan by using file timestamps. This prevents
+   * a full table scan of metric records.
+   *
+   * @return Phoenix Hint String
+   */
+  public static String getNaiveTimeRangeHint(Long startTime, Long delta) {
+    return String.format("/*+ NATIVE_TIME_RANGE(%s) */", (startTime - delta));
+  }
+
+  /**
+   * Falling back to sort merge join algorithm if default queries fail.
+   *
+   * @return Phoenix Hint String
+   */
+  public static String getLatestMetricsHints() {
+    if (sortMergeJoinEnabled) {
+      return "/*+ USE_SORT_MERGE_JOIN NO_CACHE */";
+    }
+    return "";
+  }
+
+  public static void setSortMergeJoinEnabled(boolean sortMergeJoinEnabled) {
+    PhoenixTransactSQL.sortMergeJoinEnabled = sortMergeJoinEnabled;
+  }
+
+  public static PreparedStatement prepareGetMetricsSqlStmt(Connection 
connection,
+                                                           Condition 
condition) throws SQLException {
+
+    validateConditionIsNotEmpty(condition);
+    validateRowCountLimit(condition);
+
+    String stmtStr;
+    if (condition.getStatement() != null) {
+      stmtStr = condition.getStatement();
+    } else {
+      String metricsTable;
+      String query;
+      if (condition.getPrecision() == null) {
+        long endTime = condition.getEndTime() == null ? 
System.currentTimeMillis() : condition.getEndTime();
+        long startTime = condition.getStartTime() == null ? 0 : 
condition.getStartTime();
+        Precision precision = Precision.getPrecision(startTime, endTime);
+        condition.setPrecision(precision);
+      }
+      switch (condition.getPrecision()) {
+        case DAYS:
+          metricsTable = METRICS_AGGREGATE_DAILY_TABLE_NAME;
+          query = GET_METRIC_AGGREGATE_ONLY_SQL;
+          break;
+        case HOURS:
+          metricsTable = METRICS_AGGREGATE_HOURLY_TABLE_NAME;
+          query = GET_METRIC_AGGREGATE_ONLY_SQL;
+          break;
+        case MINUTES:
+          metricsTable = METRICS_AGGREGATE_MINUTE_TABLE_NAME;
+          query = GET_METRIC_AGGREGATE_ONLY_SQL;
+          break;
+        default:
+          metricsTable = METRICS_RECORD_TABLE_NAME;
+          query = GET_METRIC_SQL;
+      }
+
+      stmtStr = String.format(query, metricsTable);
+    }
+
+    StringBuilder sb = new StringBuilder(stmtStr);
+
+    if (!(condition instanceof EmptyCondition)) {
+      sb.append(" WHERE ");
+      sb.append(condition.getConditionClause());
+      String orderByClause = condition.getOrderByClause(true);
+      if (orderByClause != null) {
+        sb.append(orderByClause);
+      } else {
+        sb.append(" ORDER BY UUID, SERVER_TIME ");
+      }
+    }
+
+    if (condition.getLimit() != null) {
+      sb.append(" LIMIT ").append(condition.getLimit());
+    }
+
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("SQL: " + sb.toString() + ", condition: " + condition);
+    }
+
+    PreparedStatement stmt = null;
+    try {
+      stmt = connection.prepareStatement(sb.toString());
+      int pos = 1;
+      pos = addUuids(condition, pos, stmt);
+
+      if (condition instanceof TopNCondition) {
+        pos = addStartTime(condition, pos, stmt);
+        pos = addEndTime(condition, pos, stmt);
+      }
+
+      pos = addStartTime(condition, pos, stmt);
+      addEndTime(condition, pos, stmt);
+
+      if (condition.getFetchSize() != null) {
+        stmt.setFetchSize(condition.getFetchSize());
+      }
+    } catch (SQLException e) {
+      if (stmt != null) {
+        stmt.close();
+      }
+      throw e;
+    }
+
+    if (condition instanceof TopNCondition) {
+      LOG.info(sb.toString());
+    }
+    return stmt;
+  }
+
+  private static void validateConditionIsNotEmpty(Condition condition) {
+    if (condition.isEmpty()) {
+      throw new IllegalArgumentException("Condition is empty.");
+    }
+  }
+
+  private static void validateRowCountLimit(Condition condition) {
+    if (condition.getMetricNames() == null
+      || condition.getMetricNames().isEmpty()) {
+      //aggregator can use empty metrics query
+      return;
+    }
+
+    long range = condition.getEndTime() - condition.getStartTime();
+    long rowsPerMetric;
+
+    //Get Precision (passed in or computed) and estimate values returned based 
on that.
+    Precision precision = condition.getPrecision();
+    if (precision == null) {
+      precision = Precision.getPrecision(condition.getStartTime(), 
condition.getEndTime());
+    }
+
+    switch (precision) {
+      case DAYS:
+        rowsPerMetric = TimeUnit.MILLISECONDS.toDays(range);
+        break;
+      case HOURS:
+        rowsPerMetric = TimeUnit.MILLISECONDS.toHours(range);
+        break;
+      case MINUTES:
+        rowsPerMetric = TimeUnit.MILLISECONDS.toMinutes(range)/5; //5 minute 
data in METRIC_AGGREGATE_MINUTE table.
+        break;
+      default:
+        rowsPerMetric = TimeUnit.MILLISECONDS.toSeconds(range)/10; //10 second 
data in METRIC_AGGREGATE table
+    }
+
+    List<String> hostNames = condition.getHostnames();
+    int numHosts = (hostNames == null || hostNames.isEmpty()) ? 1 : 
condition.getHostnames().size();
+
+    long totalRowsRequested = rowsPerMetric * 
condition.getMetricNames().size() * numHosts;
+
+    if (totalRowsRequested > PhoenixHBaseAccessor.RESULTSET_LIMIT) {
+      throw new PrecisionLimitExceededException("Requested " +  
condition.getMetricNames().size() + " metrics for "
+        + numHosts + " hosts in " + precision +  " precision for the time 
range of " + range/1000
+        + " seconds. Estimated resultset size of " + totalRowsRequested + " is 
greater than the limit of "
+        + PhoenixHBaseAccessor.RESULTSET_LIMIT + ". Request lower precision or 
fewer number of metrics or hosts." +
+        " Alternatively, increase the limit value through 
ams-site:timeline.metrics.service.default.result.limit config");
+    }
+  }
+
+  public static PreparedStatement prepareGetLatestMetricSqlStmt(
+    Connection connection, Condition condition) throws SQLException {
+
+    validateConditionIsNotEmpty(condition);
+
+    if (condition.getMetricNames() == null
+      || condition.getMetricNames().isEmpty()) {
+      throw new IllegalArgumentException("Point in time query without "
+        + "metric names not supported ");
+    }
+
+    String stmtStr;
+    if (condition.getStatement() != null) {
+      stmtStr = condition.getStatement();
+    } else {
+      stmtStr = String.format(GET_LATEST_METRIC_SQL,
+        getLatestMetricsHints(),
+        METRICS_RECORD_TABLE_NAME,
+        METRICS_RECORD_TABLE_NAME,
+        condition.getConditionClause());
+    }
+
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("SQL: " + stmtStr + ", condition: " + condition);
+    }
+    PreparedStatement stmt = null;
+    try {
+      stmt = connection.prepareStatement(stmtStr);
+      setQueryParameters(stmt, condition);
+    } catch (SQLException e) {
+      if (stmt != null) {
+        stmt.close();
+      }
+      throw e;
+    }
+
+    return stmt;
+  }
+
+  private static PreparedStatement setQueryParameters(PreparedStatement stmt,
+                                                      Condition condition) 
throws SQLException {
+    int pos = 1;
+    //For GET_LATEST_METRIC_SQL_SINGLE_HOST parameters should be set 2 times
+    do {
+      if (condition.getUuids() != null) {
+        for (byte[] uuid : condition.getUuids()) {
+          stmt.setBytes(pos++, uuid);
+        }
+      }
+      if (condition.getFetchSize() != null) {
+        stmt.setFetchSize(condition.getFetchSize());
+        pos++;
+      }
+    } while (pos < stmt.getParameterMetaData().getParameterCount());
+
+    return stmt;
+  }
+
+  public static PreparedStatement prepareGetAggregateSqlStmt(
+    Connection connection, Condition condition) throws SQLException {
+
+    validateConditionIsNotEmpty(condition);
+    validateRowCountLimit(condition);
+
+    String metricsAggregateTable;
+    String queryStmt;
+    if (condition.getPrecision() == null) {
+      long endTime = condition.getEndTime() == null ? 
System.currentTimeMillis() : condition.getEndTime();
+      long startTime = condition.getStartTime() == null ? 0 : 
condition.getStartTime();
+      condition.setPrecision(Precision.getPrecision(startTime, endTime));
+    }
+    switch (condition.getPrecision()) {
+      case DAYS:
+        metricsAggregateTable = METRICS_CLUSTER_AGGREGATE_DAILY_TABLE_NAME;
+        queryStmt = GET_CLUSTER_AGGREGATE_TIME_SQL;
+        break;
+      case HOURS:
+        metricsAggregateTable = METRICS_CLUSTER_AGGREGATE_HOURLY_TABLE_NAME;
+        queryStmt = GET_CLUSTER_AGGREGATE_TIME_SQL;
+        break;
+      case MINUTES:
+        metricsAggregateTable = METRICS_CLUSTER_AGGREGATE_MINUTE_TABLE_NAME;
+        queryStmt = GET_CLUSTER_AGGREGATE_TIME_SQL;
+        break;
+      default:
+        metricsAggregateTable = METRICS_CLUSTER_AGGREGATE_TABLE_NAME;
+        queryStmt = GET_CLUSTER_AGGREGATE_SQL;
+    }
+
+    queryStmt = String.format(queryStmt, metricsAggregateTable);
+
+    StringBuilder sb = new StringBuilder(queryStmt);
+    sb.append(" WHERE ");
+    sb.append(condition.getConditionClause());
+    sb.append(" ORDER BY UUID, SERVER_TIME");
+    if (condition.getLimit() != null) {
+      sb.append(" LIMIT ").append(condition.getLimit());
+    }
+
+    String query = sb.toString();
+
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("SQL => " + query + ", condition => " + condition);
+    }
+    PreparedStatement stmt = null;
+    try {
+      stmt = connection.prepareStatement(query);
+      int pos = 1;
+
+      pos = addUuids(condition, pos, stmt);
+
+      if (condition instanceof TopNCondition) {
+        pos = addStartTime(condition, pos, stmt);
+        pos = addEndTime(condition, pos, stmt);
+      }
+
+      // TODO: Upper case all strings on POST
+      pos = addStartTime(condition, pos, stmt);
+      addEndTime(condition, pos, stmt);
+    } catch (SQLException e) {
+      if (stmt != null) {
+        stmt.close();
+      }
+      throw e;
+    }
+
+    if (condition instanceof TopNCondition) {
+      LOG.info(sb.toString());
+    }
+    return stmt;
+  }
+
+  public static PreparedStatement prepareGetLatestAggregateMetricSqlStmt(
+    Connection connection, SplitByMetricNamesCondition condition) throws 
SQLException {
+
+    validateConditionIsNotEmpty(condition);
+
+    String stmtStr;
+    if (condition.getStatement() != null) {
+      stmtStr = condition.getStatement();
+    } else {
+      stmtStr = String.format(GET_CLUSTER_AGGREGATE_SQL,
+        METRICS_CLUSTER_AGGREGATE_TABLE_NAME);
+    }
+
+    StringBuilder sb = new StringBuilder(stmtStr);
+    sb.append(" WHERE ");
+    sb.append(condition.getConditionClause());
+    String orderByClause = condition.getOrderByClause(false);
+    if (orderByClause != null) {
+      sb.append(orderByClause);
+    } else {
+      sb.append(" ORDER BY UUID DESC, SERVER_TIME DESC  ");
+    }
+
+    sb.append(" LIMIT ").append(condition.getMetricNames().size());
+
+    String query = sb.toString();
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("SQL: " + query + ", condition: " + condition);
+    }
+
+    PreparedStatement stmt = null;
+    try {
+      stmt = connection.prepareStatement(query);
+      int pos = 1;
+      if (condition.getMetricNames() != null) {
+        for (; pos <= condition.getMetricNames().size(); pos++) {
+          stmt.setBytes(pos, condition.getCurrentUuid());
+        }
+      }
+    } catch (SQLException e) {
+      if (stmt != null) {
+
+      }
+      throw e;
+    }
+
+    return stmt;
+  }
+
+  public static String getTargetTableUsingPrecision(Precision precision, 
boolean withHosts) {
+
+    String inputTable = null;
+    if (precision != null) {
+      if (withHosts) {
+        switch (precision) {
+          case DAYS:
+            inputTable = PhoenixTransactSQL.METRICS_AGGREGATE_DAILY_TABLE_NAME;
+            break;
+          case HOURS:
+            inputTable = 
PhoenixTransactSQL.METRICS_AGGREGATE_HOURLY_TABLE_NAME;
+            break;
+          case MINUTES:
+            inputTable = 
PhoenixTransactSQL.METRICS_AGGREGATE_MINUTE_TABLE_NAME;
+            break;
+          default:
+            inputTable = PhoenixTransactSQL.METRICS_RECORD_TABLE_NAME;
+        }
+      } else {
+        switch (precision) {
+          case DAYS:
+            inputTable = 
PhoenixTransactSQL.METRICS_CLUSTER_AGGREGATE_DAILY_TABLE_NAME;
+            break;
+          case HOURS:
+            inputTable = 
PhoenixTransactSQL.METRICS_CLUSTER_AGGREGATE_HOURLY_TABLE_NAME;
+            break;
+          case MINUTES:
+            inputTable = 
PhoenixTransactSQL.METRICS_CLUSTER_AGGREGATE_MINUTE_TABLE_NAME;
+            break;
+          default:
+            inputTable = 
PhoenixTransactSQL.METRICS_CLUSTER_AGGREGATE_TABLE_NAME;
+        }
+      }
+    } else {
+      if (withHosts) {
+        inputTable = PhoenixTransactSQL.METRICS_RECORD_TABLE_NAME;
+      } else {
+        inputTable = PhoenixTransactSQL.METRICS_CLUSTER_AGGREGATE_TABLE_NAME;
+      }
+    }
+    return inputTable;
+  }
+
+  private static int addUuids(Condition condition, int pos, PreparedStatement 
stmt) throws SQLException {
+    if (condition.getUuids() != null) {
+      for (int pos2 = 1 ; pos2 <= condition.getUuids().size(); pos2++,pos++) {
+        byte[] uuid = condition.getUuids().get(pos2 - 1);
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Setting pos: " + pos + ", value = " + new String(uuid));
+        }
+
+        if (uuid.length != TimelineMetricMetadataManager.HOSTNAME_UUID_LENGTH 
+ TimelineMetricMetadataManager.TIMELINE_METRIC_UUID_LENGTH) {
+          stmt.setString(pos, new String(uuid));
+        } else {
+          stmt.setBytes(pos, uuid);
+        }
+      }
+    }
+    return pos;
+  }
+
+  private static int addStartTime(Condition condition, int pos, 
PreparedStatement stmt) throws SQLException {
+    if (condition.getStartTime() != null) {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Setting pos: " + pos + ", value: " + 
condition.getStartTime());
+      }
+      stmt.setLong(pos++, condition.getStartTime());
+    }
+    return pos;
+  }
+
+  private static int addEndTime(Condition condition, int pos, 
PreparedStatement stmt) throws SQLException {
+
+    if (condition.getEndTime() != null) {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Setting pos: " + pos + ", value: " + 
condition.getEndTime());
+      }
+      stmt.setLong(pos++, condition.getEndTime());
+    }
+    return pos;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/42112e28/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/query/SplitByMetricNamesCondition.java
----------------------------------------------------------------------
diff --git 
a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/query/SplitByMetricNamesCondition.java
 
b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/query/SplitByMetricNamesCondition.java
new file mode 100644
index 0000000..2f1697f
--- /dev/null
+++ 
b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/query/SplitByMetricNamesCondition.java
@@ -0,0 +1,189 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ambari.metrics.core.timeline.query;
+
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.hadoop.metrics2.sink.timeline.Precision;
+// TODO get rid of this class
+public class SplitByMetricNamesCondition implements Condition {
+  private final Condition adaptee;
+  private byte[] currentUuid;
+  private boolean metricNamesNotCondition = false;
+
+  public SplitByMetricNamesCondition(Condition condition){
+    this.adaptee = condition;
+  }
+
+  @Override
+  public boolean isEmpty() {
+    return adaptee.isEmpty();
+  }
+
+  @Override
+  public List<byte[]> getUuids() {
+    return adaptee.getUuids();
+  }
+
+  @Override
+  public List<String> getMetricNames() {
+    return Collections.singletonList(new String(currentUuid));
+  }
+
+  @Override
+  public boolean isPointInTime() {
+    return adaptee.isPointInTime();
+  }
+
+  @Override
+  public boolean isGrouped() {
+    return adaptee.isGrouped();
+  }
+
+  @Override
+  public void setStatement(String statement) {
+    adaptee.setStatement(statement);
+  }
+
+  @Override
+  public List<String> getHostnames() {
+    return adaptee.getHostnames();
+  }
+
+  @Override
+  public Precision getPrecision() {
+    return adaptee.getPrecision();
+  }
+
+  @Override
+  public void setPrecision(Precision precision) {
+    adaptee.setPrecision(precision);
+  }
+
+  @Override
+  public String getAppId() {
+    return adaptee.getAppId();
+  }
+
+  @Override
+  public String getInstanceId() {
+    return adaptee.getInstanceId();
+  }
+
+  @Override
+  public StringBuilder getConditionClause() {
+    StringBuilder sb = new StringBuilder();
+    boolean appendConjunction = false;
+
+    if (getMetricNames() != null) {
+      for (String name : getMetricNames()) {
+        if (sb.length() > 1) {
+          sb.append(" OR ");
+        }
+        sb.append("UUID = ?");
+      }
+
+      appendConjunction = true;
+    }
+
+    appendConjunction = DefaultCondition.append(sb, appendConjunction,
+      getStartTime(), " SERVER_TIME >= ?");
+    DefaultCondition.append(sb, appendConjunction, getEndTime(),
+      " SERVER_TIME < ?");
+
+    return sb;
+  }
+
+  @Override
+  public String getOrderByClause(boolean asc) {
+    return adaptee.getOrderByClause(asc);
+  }
+
+  @Override
+  public String getStatement() {
+    return adaptee.getStatement();
+  }
+
+  @Override
+  public Long getStartTime() {
+    return adaptee.getStartTime();
+  }
+
+  @Override
+  public Long getEndTime() {
+    return adaptee.getEndTime();
+  }
+
+  @Override
+  public Integer getLimit() {
+    return adaptee.getLimit();
+  }
+
+  @Override
+  public Integer getFetchSize() {
+    return adaptee.getFetchSize();
+  }
+
+  @Override
+  public void setFetchSize(Integer fetchSize) {
+    adaptee.setFetchSize(fetchSize);
+  }
+
+  @Override
+  public void addOrderByColumn(String column) {
+    adaptee.addOrderByColumn(column);
+  }
+
+  @Override
+  public void setNoLimit() {
+    adaptee.setNoLimit();
+  }
+
+  @Override
+  public boolean doUpdate() {
+    return false;
+  }
+
+  public List<String> getOriginalMetricNames() {
+    return adaptee.getMetricNames();
+  }
+
+  public void setCurrentUuid(byte[] uuid) {
+    this.currentUuid = uuid;
+  }
+
+  public byte[] getCurrentUuid() {
+    return currentUuid;
+  }
+
+ @Override
+  public void setMetricNamesNotCondition(boolean metricNamesNotCondition) {
+    this.metricNamesNotCondition = metricNamesNotCondition;
+  }
+
+  @Override
+  public void setHostnamesNotCondition(boolean hostNamesNotCondition) {
+
+  }
+
+  @Override
+  public void setUuidNotCondition(boolean uuidNotCondition) {
+
+  }
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/42112e28/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/query/TopNCondition.java
----------------------------------------------------------------------
diff --git 
a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/query/TopNCondition.java
 
b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/query/TopNCondition.java
new file mode 100644
index 0000000..e951cd5
--- /dev/null
+++ 
b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/query/TopNCondition.java
@@ -0,0 +1,162 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ambari.metrics.core.timeline.query;
+
+import java.util.List;
+
+import org.apache.ambari.metrics.core.timeline.aggregators.Function;
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.metrics2.sink.timeline.Precision;
+
+public class TopNCondition extends DefaultCondition{
+
+  private Integer topN;
+  private boolean isBottomN;
+  private Function topNFunction;
+  private static final Log LOG = LogFactory.getLog(TopNCondition.class);
+
+  public TopNCondition(List<byte[]> uuids, List<String> metricNames, 
List<String> hostnames, String appId,
+                          String instanceId, Long startTime, Long endTime, 
Precision precision,
+                          Integer limit, boolean grouped, Integer topN, 
Function topNFunction,
+                          boolean isBottomN) {
+    super(uuids, metricNames, hostnames, appId, instanceId, startTime, 
endTime, precision, limit, grouped);
+    this.topN = topN;
+    this.isBottomN = isBottomN;
+    this.topNFunction = topNFunction;
+  }
+
+  @Override
+  public StringBuilder getConditionClause() {
+
+
+    if (!(isTopNHostCondition(metricNames, hostnames) || 
isTopNMetricCondition(metricNames, hostnames))) {
+      LOG.error("Unsupported TopN Operation requested. Query can have either 
multiple hosts or multiple metric names " +
+        "but not both.");
+      return null;
+    }
+
+    StringBuilder sb = new StringBuilder();
+    sb.append(" UUID IN (");
+    sb.append(getTopNInnerQuery());
+    sb.append(")");
+
+    boolean appendConjunction = true;
+    appendConjunction = append(sb, appendConjunction, getStartTime(), " 
SERVER_TIME >= ?");
+    append(sb, appendConjunction, getEndTime(), " SERVER_TIME < ?");
+
+    return sb;
+  }
+
+  public String getTopNInnerQuery() {
+    return String.format(PhoenixTransactSQL.TOP_N_INNER_SQL,
+      PhoenixTransactSQL.getTargetTableUsingPrecision(precision, 
CollectionUtils.isNotEmpty(hostnames)),
+      super.getConditionClause().toString(), getTopNOrderByClause(), topN);
+  }
+
+  private String getTopNOrderByClause() {
+
+    String orderByClause = getColumnSelect(this.topNFunction);
+    orderByClause += (isBottomN ? " ASC" : " DESC");
+    return  orderByClause;
+  }
+
+  public static String getColumnSelect(Function topNFunction) {
+    String columnSelect = null;
+    if (topNFunction != null) {
+      switch (topNFunction.getReadFunction()) {
+        case AVG:
+          columnSelect = "ROUND(AVG(METRIC_SUM),2)";
+          break;
+        case SUM:
+          columnSelect = "SUM(METRIC_SUM)";
+          break;
+        default:
+          columnSelect = "MAX(METRIC_MAX)";
+          break;
+      }
+    }
+    if (columnSelect == null) {
+      columnSelect = "MAX(METRIC_MAX)";
+    }
+    return  columnSelect;
+  }
+
+  public boolean isTopNHostCondition() {
+    return isTopNHostCondition(metricNames, hostnames);
+  }
+
+  public boolean isTopNMetricCondition() {
+    return isTopNMetricCondition(metricNames, hostnames);
+  }
+
+  /**
+   * Check if this is a case of Top N hosts condition
+   * @param metricNames A list of Strings.
+   * @param hostnames A list of Strings.
+   * @return True if it is a Case of Top N Hosts (1 Metric and H hosts).
+   */
+  public static boolean isTopNHostCondition(List<String> metricNames, 
List<String> hostnames) {
+    // Case 1 : 1 Metric, H hosts
+    // Select Top N or Bottom N host series based on 1 metric (max/avg/sum)
+    // Hostnames cannot be empty
+    // Only 1 metric allowed, without wildcards
+    return (CollectionUtils.isNotEmpty(hostnames) && metricNames.size() == 1 
&& !metricNamesHaveWildcard(metricNames));
+
+  }
+
+  /**
+   * Check if this is a case of Top N metrics condition
+   * @param metricNames A list of Strings.
+   * @param hostnames A list of Strings.
+   * @return True if it is a Case of Top N Metrics (M Metric and 1 or 0 host).
+   */
+  public static boolean isTopNMetricCondition(List<String> metricNames, 
List<String> hostnames) {
+    // Case 2 : M Metric names or Regex, 1 or No host
+    // Select Top N or Bottom N metric series based on metric 
values(max/avg/sum)
+    // MetricNames cannot be empty
+    // No host (aggregate) or 1 host allowed, without wildcards
+    return (CollectionUtils.isNotEmpty(metricNames) && (hostnames == null || 
hostnames.size() <= 1) &&
+      !hostNamesHaveWildcard(hostnames));
+  }
+
+  public Integer getTopN() {
+    return topN;
+  }
+
+  public void setTopN(Integer topN) {
+    this.topN = topN;
+  }
+
+  public boolean isBottomN() {
+    return isBottomN;
+  }
+
+  public void setIsBottomN(boolean isBottomN) {
+    this.isBottomN = isBottomN;
+  }
+
+  public Function getTopNFunction() {
+    return topNFunction;
+  }
+
+  public void setTopNFunction(Function topNFunction) {
+    this.topNFunction = topNFunction;
+  }
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/42112e28/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/sink/DefaultFSSinkProvider.java
----------------------------------------------------------------------
diff --git 
a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/sink/DefaultFSSinkProvider.java
 
b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/sink/DefaultFSSinkProvider.java
new file mode 100644
index 0000000..52abc1e
--- /dev/null
+++ 
b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/sink/DefaultFSSinkProvider.java
@@ -0,0 +1,151 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ambari.metrics.core.timeline.sink;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Date;
+
+import org.apache.ambari.metrics.core.timeline.TimelineMetricConfiguration;
+import org.apache.ambari.metrics.core.timeline.source.InternalSourceProvider;
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.lang.StringUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
+import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics;
+
+public class DefaultFSSinkProvider implements ExternalSinkProvider {
+  private static final Log LOG = 
LogFactory.getLog(DefaultFSSinkProvider.class);
+  TimelineMetricConfiguration conf = TimelineMetricConfiguration.getInstance();
+  private final DefaultExternalMetricsSink sink = new 
DefaultExternalMetricsSink();
+  private long FIXED_FILE_SIZE;
+  private final String SINK_FILE_NAME = "external-metrics-sink.dat";
+  private final String SEPARATOR = ", ";
+  private final String LINE_SEP = System.lineSeparator();
+  private final String HEADERS = "METRIC, APP_ID, INSTANCE_ID, HOSTNAME, 
START_TIME, DATA";
+
+  public DefaultFSSinkProvider() {
+    try {
+      FIXED_FILE_SIZE = 
conf.getMetricsConf().getLong("timeline.metrics.service.external.fs.sink.filesize",
 FileUtils.ONE_MB * 100);
+    } catch (Exception ignored) {
+      FIXED_FILE_SIZE = FileUtils.ONE_MB * 100;
+    }
+  }
+
+  @Override
+  public ExternalMetricsSink 
getExternalMetricsSink(InternalSourceProvider.SOURCE_NAME sourceName) {
+    return sink;
+  }
+
+  class DefaultExternalMetricsSink implements ExternalMetricsSink {
+
+    @Override
+    public int getSinkTimeOutSeconds() {
+      return 10;
+    }
+
+    @Override
+    public int getFlushSeconds() {
+      try {
+        return 
conf.getMetricsConf().getInt(TimelineMetricConfiguration.TIMELINE_METRICS_CACHE_COMMIT_INTERVAL,
 3);
+      } catch (Exception e) {
+        LOG.warn("Cannot read cache commit interval.");
+      }
+      return 3;
+    }
+
+    private boolean createFile(File f) {
+      boolean created = false;
+      if (!f.exists()) {
+        try {
+          created = f.createNewFile();
+          FileUtils.writeStringToFile(f, HEADERS);
+        } catch (IOException e) {
+          LOG.error("Cannot create " + SINK_FILE_NAME + " at " + f.getPath());
+          return false;
+        }
+      }
+
+      return created;
+    }
+
+    private boolean shouldReCreate(File f) {
+      if (!f.exists()) {
+        return true;
+      }
+      if (FileUtils.sizeOf(f) > FIXED_FILE_SIZE) {
+        return true;
+      }
+      return false;
+    }
+
+    @Override
+    public void sinkMetricData(Collection<TimelineMetrics> metrics) {
+      String dirPath = 
TimelineMetricConfiguration.getInstance().getDefaultMetricsSinkDir();
+      File dir = new File(dirPath);
+      if (!dir.exists()) {
+        LOG.error("Cannot sink data to file system, incorrect dir path " + 
dirPath);
+        return;
+      }
+
+      File f = FileUtils.getFile(dirPath, SINK_FILE_NAME);
+      if (shouldReCreate(f)) {
+        if (!f.delete()) {
+          LOG.warn("Unable to delete external sink file.");
+          return;
+        }
+        createFile(f);
+      }
+
+      if (metrics != null) {
+        for (TimelineMetrics timelineMetrics : metrics) {
+          for (TimelineMetric metric : timelineMetrics.getMetrics()) {
+            StringBuilder sb = new StringBuilder();
+            sb.append(metric.getMetricName());
+            sb.append(SEPARATOR);
+            sb.append(metric.getAppId());
+            sb.append(SEPARATOR);
+            if (StringUtils.isEmpty(metric.getInstanceId())) {
+              sb.append(SEPARATOR);
+            } else {
+              sb.append(metric.getInstanceId());
+              sb.append(SEPARATOR);
+            }
+            if (StringUtils.isEmpty(metric.getHostName())) {
+              sb.append(SEPARATOR);
+            } else {
+              sb.append(metric.getHostName());
+              sb.append(SEPARATOR);
+            }
+            sb.append(new Date(metric.getStartTime()));
+            sb.append(SEPARATOR);
+            sb.append(metric.getMetricValues().toString());
+            sb.append(LINE_SEP);
+            try {
+              FileUtils.writeStringToFile(f, sb.toString());
+            } catch (IOException e) {
+              LOG.warn("Unable to sink data to file " + f.getPath());
+            }
+          }
+        }
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/42112e28/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/sink/ExternalMetricsSink.java
----------------------------------------------------------------------
diff --git 
a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/sink/ExternalMetricsSink.java
 
b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/sink/ExternalMetricsSink.java
new file mode 100644
index 0000000..b2bb908
--- /dev/null
+++ 
b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/sink/ExternalMetricsSink.java
@@ -0,0 +1,48 @@
+package org.apache.ambari.metrics.core.timeline.sink;
+
+import java.util.Collection;
+
+import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+public interface ExternalMetricsSink {
+  /**
+   * How many seconds to wait on sink before dropping metrics.
+   * Note: Care should be taken that this timeout does not bottleneck the
+   * sink thread.
+   */
+  int getSinkTimeOutSeconds();
+
+  /**
+   * How frequently to flush data to external system.
+   * Default would be between 60 - 120 seconds, coherent with default sink
+   * interval of AMS.
+   */
+  int getFlushSeconds();
+
+  /**
+   * Raw data stream to process / store on external system.
+   * The data will be held in an in-memory cache and flushed at flush seconds
+   * or when the cache size limit is exceeded we will flush the cache and
+   * drop data if write fails.
+   *
+   * @param metrics {@link Collection<TimelineMetrics>}
+   */
+  void sinkMetricData(Collection<TimelineMetrics> metrics);
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/42112e28/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/sink/ExternalSinkProvider.java
----------------------------------------------------------------------
diff --git 
a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/sink/ExternalSinkProvider.java
 
b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/sink/ExternalSinkProvider.java
new file mode 100644
index 0000000..83cdc33
--- /dev/null
+++ 
b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/sink/ExternalSinkProvider.java
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.metrics.core.timeline.sink;
+
+import org.apache.ambari.metrics.core.timeline.source.InternalSourceProvider;
+
+
+/**
+ * Configurable provider for sink classes that match the metrics sources.
+ * Provider can return same sink or different sinks for each source.
+ */
+public interface ExternalSinkProvider {
+
+  /**
+   * Return an instance of the metrics sink for the give source
+   * @return {@link ExternalMetricsSink}
+   */
+  ExternalMetricsSink 
getExternalMetricsSink(InternalSourceProvider.SOURCE_NAME sourceName);
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/42112e28/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/sink/HttpSinkProvider.java
----------------------------------------------------------------------
diff --git 
a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/sink/HttpSinkProvider.java
 
b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/sink/HttpSinkProvider.java
new file mode 100644
index 0000000..cbb0834
--- /dev/null
+++ 
b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/sink/HttpSinkProvider.java
@@ -0,0 +1,231 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ambari.metrics.core.timeline.sink;
+
+import static 
org.apache.ambari.metrics.core.timeline.TimelineMetricConfiguration.TIMELINE_METRICS_CACHE_COMMIT_INTERVAL;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.io.OutputStream;
+import java.net.HttpURLConnection;
+import java.net.URL;
+import java.security.KeyStore;
+import java.util.Collection;
+
+import javax.net.ssl.HttpsURLConnection;
+import javax.net.ssl.SSLContext;
+import javax.net.ssl.SSLSocketFactory;
+import javax.net.ssl.TrustManagerFactory;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics;
+import org.apache.ambari.metrics.core.timeline.TimelineMetricConfiguration;
+import org.apache.ambari.metrics.core.timeline.source.InternalSourceProvider;
+import org.apache.http.client.utils.URIBuilder;
+import org.codehaus.jackson.map.AnnotationIntrospector;
+import org.codehaus.jackson.map.ObjectMapper;
+import org.codehaus.jackson.map.annotate.JsonSerialize;
+import org.codehaus.jackson.xc.JaxbAnnotationIntrospector;
+
+public class HttpSinkProvider implements ExternalSinkProvider {
+  private static final Log LOG = LogFactory.getLog(HttpSinkProvider.class);
+  TimelineMetricConfiguration conf = TimelineMetricConfiguration.getInstance();
+
+  private String connectUrl;
+  private SSLSocketFactory sslSocketFactory;
+  protected static ObjectMapper mapper;
+
+  static {
+    mapper = new ObjectMapper();
+    AnnotationIntrospector introspector = new JaxbAnnotationIntrospector();
+    mapper.setAnnotationIntrospector(introspector);
+    mapper.getSerializationConfig()
+      .withSerializationInclusion(JsonSerialize.Inclusion.NON_NULL);
+  }
+
+  public HttpSinkProvider() {
+    Configuration config;
+    try {
+      config = conf.getMetricsConf();
+    } catch (Exception e) {
+      throw new ExceptionInInitializerError("Unable to read configuration for 
sink.");
+    }
+    String protocol = 
config.get("timeline.metrics.service.external.http.sink.protocol", "http");
+    String host = 
config.get("timeline.metrics.service.external.http.sink.host", "localhost");
+    String port = 
config.get("timeline.metrics.service.external.http.sink.port", "6189");
+
+    if (protocol.contains("https")) {
+      loadTruststore(
+        
config.getTrimmed("timeline.metrics.service.external.http.sink.truststore.path"),
+        
config.getTrimmed("timeline.metrics.service.external.http.sink.truststore.type"),
+        
config.getTrimmed("timeline.metrics.service.external.http.sink.truststore.password")
+      );
+    }
+
+    URIBuilder uriBuilder = new URIBuilder();
+    uriBuilder.setScheme(protocol);
+    uriBuilder.setHost(host);
+    uriBuilder.setPort(Integer.parseInt(port));
+    connectUrl = uriBuilder.toString();
+  }
+
+  @Override
+  public ExternalMetricsSink 
getExternalMetricsSink(InternalSourceProvider.SOURCE_NAME sourceName) {
+    return new DefaultHttpMetricsSink();
+  }
+
+  protected HttpURLConnection getConnection(String spec) throws IOException {
+    return (HttpURLConnection) new URL(spec).openConnection();
+  }
+
+  // Get an ssl connection
+  protected HttpsURLConnection getSSLConnection(String spec)
+    throws IOException, IllegalStateException {
+
+    HttpsURLConnection connection = (HttpsURLConnection) (new 
URL(spec).openConnection());
+    connection.setSSLSocketFactory(sslSocketFactory);
+    return connection;
+  }
+
+  protected void loadTruststore(String trustStorePath, String trustStoreType,
+                                String trustStorePassword) {
+    if (sslSocketFactory == null) {
+      if (trustStorePath == null || trustStorePassword == null) {
+        String msg = "Can't load TrustStore. Truststore path or password is 
not set.";
+        LOG.error(msg);
+        throw new IllegalStateException(msg);
+      }
+      FileInputStream in = null;
+      try {
+        in = new FileInputStream(new File(trustStorePath));
+        KeyStore store = KeyStore.getInstance(trustStoreType == null ?
+          KeyStore.getDefaultType() : trustStoreType);
+        store.load(in, trustStorePassword.toCharArray());
+        TrustManagerFactory tmf = TrustManagerFactory
+          .getInstance(TrustManagerFactory.getDefaultAlgorithm());
+        tmf.init(store);
+        SSLContext context = SSLContext.getInstance("TLS");
+        context.init(null, tmf.getTrustManagers(), null);
+        sslSocketFactory = context.getSocketFactory();
+      } catch (Exception e) {
+        LOG.error("Unable to load TrustStore", e);
+      } finally {
+        if (in != null) {
+          try {
+            in.close();
+          } catch (IOException e) {
+            LOG.error("Unable to load TrustStore", e);
+          }
+        }
+      }
+    }
+  }
+
+  class DefaultHttpMetricsSink implements ExternalMetricsSink {
+
+    @Override
+    public int getSinkTimeOutSeconds() {
+      try {
+        return 
conf.getMetricsConf().getInt("timeline.metrics.external.sink.http.timeout.seconds",
 10);
+      } catch (Exception e) {
+        return 10;
+      }
+    }
+
+    @Override
+    public int getFlushSeconds() {
+      try {
+        return 
conf.getMetricsConf().getInt(TIMELINE_METRICS_CACHE_COMMIT_INTERVAL, 3);
+      } catch (Exception e) {
+        LOG.warn("Cannot read cache commit interval.");
+      }
+      return 3;
+    }
+
+    /**
+     * Cleans up and closes an input stream
+     * see 
http://docs.oracle.com/javase/6/docs/technotes/guides/net/http-keepalive.html
+     * @param is the InputStream to clean up
+     * @return string read from the InputStream
+     * @throws IOException
+     */
+    protected String cleanupInputStream(InputStream is) throws IOException {
+      StringBuilder sb = new StringBuilder();
+      if (is != null) {
+        try (
+          InputStreamReader isr = new InputStreamReader(is);
+          BufferedReader br = new BufferedReader(isr)
+        ) {
+          // read the response body
+          String line;
+          while ((line = br.readLine()) != null) {
+            if (LOG.isDebugEnabled()) {
+              sb.append(line);
+            }
+          }
+        } finally {
+          is.close();
+        }
+      }
+      return sb.toString();
+    }
+
+    @Override
+    public void sinkMetricData(Collection<TimelineMetrics> metrics) {
+      HttpURLConnection connection = null;
+      try {
+        connection = connectUrl.startsWith("https") ? 
getSSLConnection(connectUrl) : getConnection(connectUrl);
+
+        connection.setRequestMethod("POST");
+        connection.setRequestProperty("Content-Type", "application/json");
+        connection.setRequestProperty("Connection", "Keep-Alive");
+        connection.setConnectTimeout(getSinkTimeOutSeconds());
+        connection.setReadTimeout(getSinkTimeOutSeconds());
+        connection.setDoOutput(true);
+
+        if (metrics != null) {
+          String jsonData = mapper.writeValueAsString(metrics);
+          try (OutputStream os = connection.getOutputStream()) {
+            os.write(jsonData.getBytes("UTF-8"));
+          }
+        }
+
+        int statusCode = connection.getResponseCode();
+
+        if (statusCode != 200) {
+          LOG.info("Unable to POST metrics to external sink, " + connectUrl +
+            ", statusCode = " + statusCode);
+        } else {
+          if (LOG.isDebugEnabled()) {
+            LOG.debug("Metrics posted to external sink " + connectUrl);
+          }
+        }
+        cleanupInputStream(connection.getInputStream());
+
+      } catch (IOException io) {
+        LOG.warn("Unable to sink data to external system.", io);
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/42112e28/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/sink/KafkaSinkProvider.java
----------------------------------------------------------------------
diff --git 
a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/sink/KafkaSinkProvider.java
 
b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/sink/KafkaSinkProvider.java
new file mode 100644
index 0000000..1ce624b
--- /dev/null
+++ 
b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/sink/KafkaSinkProvider.java
@@ -0,0 +1,109 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ambari.metrics.core.timeline.sink;
+
+import java.util.Collection;
+import java.util.Properties;
+import java.util.concurrent.Future;
+
+import org.apache.ambari.metrics.core.timeline.TimelineMetricConfiguration;
+import org.apache.ambari.metrics.core.timeline.source.InternalSourceProvider;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.clients.producer.RecordMetadata;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+
+/*
+  This will be used by the single Metrics committer thread. Hence it is
+  important to make this non-blocking export.
+ */
+public class KafkaSinkProvider implements ExternalSinkProvider {
+  private static String TOPIC_NAME = "ambari-metrics-topic";
+  private static final Log LOG = LogFactory.getLog(KafkaSinkProvider.class);
+
+  private Producer producer;
+  private int TIMEOUT_SECONDS = 10;
+  private int FLUSH_SECONDS = 3;
+
+  ObjectMapper objectMapper = new ObjectMapper();
+
+  public KafkaSinkProvider() {
+    TimelineMetricConfiguration configuration = 
TimelineMetricConfiguration.getInstance();
+
+    Properties configProperties = new Properties();
+    try {
+      configProperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, 
configuration.getMetricsConf().getTrimmed(TimelineMetricConfiguration.KAFKA_SERVERS));
+      configProperties.put(ProducerConfig.ACKS_CONFIG, 
configuration.getMetricsConf().getTrimmed(TimelineMetricConfiguration.KAFKA_ACKS,
 "all"));
+      // Avoid duplicates - No transactional semantics
+      configProperties.put(ProducerConfig.RETRIES_CONFIG, 
configuration.getMetricsConf().getInt(TimelineMetricConfiguration.KAFKA_RETRIES,
 0));
+      configProperties.put(ProducerConfig.BATCH_SIZE_CONFIG, 
configuration.getMetricsConf().getInt(TimelineMetricConfiguration.KAFKA_BATCH_SIZE,
 128));
+      configProperties.put(ProducerConfig.LINGER_MS_CONFIG, 
configuration.getMetricsConf().getInt(TimelineMetricConfiguration.KAFKA_LINGER_MS,
 1));
+      configProperties.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 
configuration.getMetricsConf().getLong(TimelineMetricConfiguration.KAFKA_BUFFER_MEM,
 33554432)); // 32 MB
+      FLUSH_SECONDS = 
configuration.getMetricsConf().getInt(TimelineMetricConfiguration.TIMELINE_METRICS_CACHE_COMMIT_INTERVAL,
 3);
+      TIMEOUT_SECONDS = 
configuration.getMetricsConf().getInt(TimelineMetricConfiguration.KAFKA_SINK_TIMEOUT_SECONDS,
 10);
+    } catch (Exception e) {
+      LOG.error("Configuration error!", e);
+      throw new ExceptionInInitializerError(e);
+    }
+    
configProperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.ByteArraySerializer");
+    
configProperties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.connect.json.JsonSerializer");
+
+    
+
+    producer = new KafkaProducer(configProperties);
+  }
+
+  @Override
+  public ExternalMetricsSink 
getExternalMetricsSink(InternalSourceProvider.SOURCE_NAME sourceName) {
+    switch (sourceName) {
+      case RAW_METRICS:
+        return new KafkaRawMetricsSink();
+      default:
+        throw new UnsupportedOperationException("Provider does not support " +
+          "the expected source " + sourceName);
+    }
+  }
+
+  class KafkaRawMetricsSink implements ExternalMetricsSink {
+
+    @Override
+    public int getSinkTimeOutSeconds() {
+      return TIMEOUT_SECONDS;
+    }
+
+    @Override
+    public int getFlushSeconds() {
+      return FLUSH_SECONDS;
+    }
+
+    @Override
+    public void sinkMetricData(Collection<TimelineMetrics> metrics) {
+      JsonNode jsonNode = objectMapper.valueToTree(metrics);
+      ProducerRecord<String, JsonNode> rec = new ProducerRecord<String, 
JsonNode>(TOPIC_NAME, jsonNode);
+      Future<RecordMetadata> f = producer.send(rec);
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/42112e28/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/source/DefaultInternalMetricsSourceProvider.java
----------------------------------------------------------------------
diff --git 
a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/source/DefaultInternalMetricsSourceProvider.java
 
b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/source/DefaultInternalMetricsSourceProvider.java
new file mode 100644
index 0000000..1287328
--- /dev/null
+++ 
b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/source/DefaultInternalMetricsSourceProvider.java
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ambari.metrics.core.timeline.source;
+
+import org.apache.ambari.metrics.core.timeline.sink.ExternalMetricsSink;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+public class DefaultInternalMetricsSourceProvider implements 
InternalSourceProvider {
+  private static final Log LOG = 
LogFactory.getLog(DefaultInternalMetricsSourceProvider.class);
+
+  // TODO: Implement read based sources for higher order data
+  @Override
+  public InternalMetricsSource getInternalMetricsSource(SOURCE_NAME 
sourceName, int sinkIntervalSeconds, ExternalMetricsSink sink) {
+    if (sink == null) {
+      LOG.warn("No external sink configured for source " + sourceName);
+      return null;
+    }
+
+    switch (sourceName) {
+      case RAW_METRICS:
+        return new RawMetricsSource(sinkIntervalSeconds, sink);
+      default:
+        throw new UnsupportedOperationException("Unimplemented source type " + 
sourceName);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/42112e28/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/source/InternalMetricsSource.java
----------------------------------------------------------------------
diff --git 
a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/source/InternalMetricsSource.java
 
b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/source/InternalMetricsSource.java
new file mode 100644
index 0000000..5aaf128
--- /dev/null
+++ 
b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/source/InternalMetricsSource.java
@@ -0,0 +1,30 @@
+package org.apache.ambari.metrics.core.timeline.source;
+
+import java.util.Collection;
+
+import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+public interface InternalMetricsSource {
+  /**
+   * Write metrics to external sink.
+   * Allows pre-processing and caching capabilities to the consumer.
+   */
+  void publishTimelineMetrics(Collection<TimelineMetrics> metrics);
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/42112e28/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/source/InternalSourceProvider.java
----------------------------------------------------------------------
diff --git 
a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/source/InternalSourceProvider.java
 
b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/source/InternalSourceProvider.java
new file mode 100644
index 0000000..3bd8e42
--- /dev/null
+++ 
b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/source/InternalSourceProvider.java
@@ -0,0 +1,39 @@
+package org.apache.ambari.metrics.core.timeline.source;
+
+import org.apache.ambari.metrics.core.timeline.sink.ExternalMetricsSink;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+public interface InternalSourceProvider {
+
+  enum SOURCE_NAME {
+    RAW_METRICS,
+    MINUTE_HOST_AGGREAGATE_METRICS,
+    HOURLY_HOST_AGGREAGATE_METRICS,
+    DAILY_HOST_AGGREAGATE_METRICS,
+    MINUTE_CLUSTER_AGGREAGATE_METRICS,
+    HOURLY_CLUSTER_AGGREAGATE_METRICS,
+    DAILY_CLUSTER_AGGREAGATE_METRICS,
+  }
+
+  /**
+   * Provide Source for metrics data.
+   * @return {@link InternalMetricsSource}
+   */
+  InternalMetricsSource getInternalMetricsSource(SOURCE_NAME sourceName, int 
sinkIntervalSeconds, ExternalMetricsSink sink);
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/42112e28/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/source/RawMetricsSource.java
----------------------------------------------------------------------
diff --git 
a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/source/RawMetricsSource.java
 
b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/source/RawMetricsSource.java
new file mode 100644
index 0000000..9dddb33
--- /dev/null
+++ 
b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/source/RawMetricsSource.java
@@ -0,0 +1,85 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ambari.metrics.core.timeline.source;
+
+import java.util.Collection;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import org.apache.ambari.metrics.core.timeline.sink.ExternalMetricsSink;
+import 
org.apache.ambari.metrics.core.timeline.source.cache.InternalMetricsCacheProvider;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics;
+import 
org.apache.ambari.metrics.core.timeline.source.cache.InternalMetricsCache;
+
+public class RawMetricsSource implements InternalMetricsSource {
+  private static final Log LOG = LogFactory.getLog(RawMetricsSource.class);
+  private final int internalCacheInterval;
+  private final ExternalMetricsSink rawMetricsSink;
+  private final ScheduledExecutorService executorService = 
Executors.newSingleThreadScheduledExecutor();
+  private final InternalMetricsCache cache;
+  static final String RAW_METRICS_CACHE = "RAW_METRICS_CACHE_INSTANCE";
+
+  public RawMetricsSource(int internalCacheInterval, ExternalMetricsSink 
rawMetricsSink) {
+    this.internalCacheInterval = internalCacheInterval;
+    this.rawMetricsSink = rawMetricsSink;
+    this.cache = 
InternalMetricsCacheProvider.getInstance().getCacheInstance(RAW_METRICS_CACHE);
+    if (rawMetricsSink.getFlushSeconds() > internalCacheInterval) {
+      initializeFixedRateScheduler();
+    }
+  }
+
+  @Override
+  public void publishTimelineMetrics(Collection<TimelineMetrics> metrics) {
+    // TODO: Adjust default flush to reasonable defaults > 3 seconds
+    if (rawMetricsSink.getFlushSeconds() > internalCacheInterval) {
+      // Need to cache only if external sink cannot keep up and thereby has
+      // different flush interval as compared to HBase flush
+      cache.putAll(metrics); // Scheduler initialized already for flush
+    } else {
+      submitDataWithTimeout(metrics);
+    }
+  }
+
+  private void initializeFixedRateScheduler() {
+    executorService.scheduleAtFixedRate(() -> 
rawMetricsSink.sinkMetricData(cache.evictAll()),
+      rawMetricsSink.getFlushSeconds(), rawMetricsSink.getFlushSeconds(), 
TimeUnit.SECONDS);
+  }
+
+  private void submitDataWithTimeout(final Collection<TimelineMetrics> 
metrics) {
+    Future f = executorService.submit(() -> {
+      rawMetricsSink.sinkMetricData(metrics);
+      return null;
+    });
+    try {
+      f.get(rawMetricsSink.getSinkTimeOutSeconds(), TimeUnit.SECONDS);
+    } catch (InterruptedException e) {
+      LOG.warn("Raw metrics sink interrupted.");
+    } catch (ExecutionException e) {
+      LOG.warn("Exception on sinking metrics", e);
+    } catch (TimeoutException e) {
+      LOG.warn("Timeout exception on sinking metrics", e);
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/42112e28/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/source/cache/InternalMetricCacheKey.java
----------------------------------------------------------------------
diff --git 
a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/source/cache/InternalMetricCacheKey.java
 
b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/source/cache/InternalMetricCacheKey.java
new file mode 100644
index 0000000..bfb610f
--- /dev/null
+++ 
b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/source/cache/InternalMetricCacheKey.java
@@ -0,0 +1,109 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ambari.metrics.core.timeline.source.cache;
+
+public class InternalMetricCacheKey {
+  private String metricName;
+  private String appId;
+  private String instanceId;
+  private String hostname;
+  private long startTime; // Useful for debugging
+
+  public InternalMetricCacheKey(String metricName, String appId, String 
instanceId, String hostname, long startTime) {
+    this.metricName = metricName;
+    this.appId = appId;
+    this.instanceId = instanceId;
+    this.hostname = hostname;
+    this.startTime = startTime;
+  }
+
+  public String getMetricName() {
+    return metricName;
+  }
+
+  public void setMetricName(String metricName) {
+    this.metricName = metricName;
+  }
+
+  public String getAppId() {
+    return appId;
+  }
+
+  public void setAppId(String appId) {
+    this.appId = appId;
+  }
+
+  public String getInstanceId() {
+    return instanceId;
+  }
+
+  public void setInstanceId(String instanceId) {
+    this.instanceId = instanceId;
+  }
+
+  public String getHostname() {
+    return hostname;
+  }
+
+  public void setHostname(String hostname) {
+    this.hostname = hostname;
+  }
+
+  public long getStartTime() {
+    return startTime;
+  }
+
+  public void setStartTime(long startTime) {
+    this.startTime = startTime;
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) return true;
+    if (o == null || getClass() != o.getClass()) return false;
+
+    InternalMetricCacheKey that = (InternalMetricCacheKey) o;
+
+    if (!getMetricName().equals(that.getMetricName())) return false;
+    if (!getAppId().equals(that.getAppId())) return false;
+    if (getInstanceId() != null ? 
!getInstanceId().equals(that.getInstanceId()) : that.getInstanceId() != null)
+      return false;
+    return getHostname() != null ? getHostname().equals(that.getHostname()) : 
that.getHostname() == null;
+
+  }
+
+  @Override
+  public int hashCode() {
+    int result = getMetricName().hashCode();
+    result = 31 * result + getAppId().hashCode();
+    result = 31 * result + (getInstanceId() != null ? 
getInstanceId().hashCode() : 0);
+    result = 31 * result + (getHostname() != null ? getHostname().hashCode() : 
0);
+    return result;
+  }
+
+  @Override
+  public String toString() {
+    return "InternalMetricCacheKey{" +
+      "metricName='" + metricName + '\'' +
+      ", appId='" + appId + '\'' +
+      ", instanceId='" + instanceId + '\'' +
+      ", hostname='" + hostname + '\'' +
+      ", startTime=" + startTime +
+      '}';
+  }
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/42112e28/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/source/cache/InternalMetricCacheValue.java
----------------------------------------------------------------------
diff --git 
a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/source/cache/InternalMetricCacheValue.java
 
b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/source/cache/InternalMetricCacheValue.java
new file mode 100644
index 0000000..6573c4e
--- /dev/null
+++ 
b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/source/cache/InternalMetricCacheValue.java
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ambari.metrics.core.timeline.source.cache;
+
+import java.util.Map;
+import java.util.TreeMap;
+
+public class InternalMetricCacheValue {
+  private TreeMap<Long, Double> metricValues = new TreeMap<Long, Double>();
+
+  public TreeMap<Long, Double> getMetricValues() {
+    return metricValues;
+  }
+
+  public void setMetricValues(TreeMap<Long, Double> metricValues) {
+    this.metricValues = metricValues;
+  }
+
+  public void addMetricValues(Map<Long, Double> metricValues) {
+    this.metricValues.putAll(metricValues);
+  }
+}

Reply via email to