http://git-wip-us.apache.org/repos/asf/ambari/blob/42112e28/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/query/ConnectionProvider.java
----------------------------------------------------------------------
diff --git 
a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/query/ConnectionProvider.java
 
b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/query/ConnectionProvider.java
deleted file mode 100644
index 24239a0..0000000
--- 
a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/query/ConnectionProvider.java
+++ /dev/null
@@ -1,29 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query;
-
-
-import java.sql.Connection;
-import java.sql.SQLException;
-
-/**
- *
- */
-public interface ConnectionProvider {
-  public Connection getConnection() throws SQLException;
-}

http://git-wip-us.apache.org/repos/asf/ambari/blob/42112e28/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/query/DefaultCondition.java
----------------------------------------------------------------------
diff --git 
a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/query/DefaultCondition.java
 
b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/query/DefaultCondition.java
deleted file mode 100644
index a88f44e..0000000
--- 
a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/query/DefaultCondition.java
+++ /dev/null
@@ -1,421 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query;
-
-import java.util.ArrayList;
-import java.util.LinkedHashSet;
-import java.util.List;
-import java.util.Set;
-
-import org.apache.commons.collections.CollectionUtils;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.metrics2.sink.timeline.Precision;
-import 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixHBaseAccessor;
-import 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.discovery.TimelineMetricMetadataManager;
-
-public class DefaultCondition implements Condition {
-  List<String> metricNames;
-  List<String> hostnames;
-  String appId;
-  String instanceId;
-  Long startTime;
-  Long endTime;
-  Precision precision;
-  Integer limit;
-  boolean grouped;
-  boolean noLimit = false;
-  Integer fetchSize;
-  String statement;
-  Set<String> orderByColumns = new LinkedHashSet<String>();
-  boolean metricNamesNotCondition = false;
-  boolean hostNamesNotCondition = false;
-  boolean uuidNotCondition = false;
-  List<byte[]> uuids = new ArrayList<>();
-
-  private static final Log LOG = LogFactory.getLog(DefaultCondition.class);
-
-  public DefaultCondition(List<String> metricNames, List<String> hostnames, 
String appId,
-                          String instanceId, Long startTime, Long endTime, 
Precision precision,
-                          Integer limit, boolean grouped) {
-    this.metricNames = metricNames;
-    this.hostnames = hostnames;
-    this.appId = appId;
-    this.instanceId = instanceId;
-    this.startTime = startTime;
-    this.endTime = endTime;
-    this.precision = precision;
-    this.limit = limit;
-    this.grouped = grouped;
-  }
-
-  public DefaultCondition(List<byte[]> uuids, List<String> metricNames, 
List<String> hostnames, String appId,
-                          String instanceId, Long startTime, Long endTime, 
Precision precision,
-                          Integer limit, boolean grouped) {
-    this.uuids = uuids;
-    this.metricNames = metricNames;
-    this.hostnames = hostnames;
-    this.appId = appId;
-    this.instanceId = instanceId;
-    this.startTime = startTime;
-    this.endTime = endTime;
-    this.precision = precision;
-    this.limit = limit;
-    this.grouped = grouped;
-  }
-
-  public String getStatement() {
-    return statement;
-  }
-
-  public void setStatement(String statement) {
-    this.statement = statement;
-  }
-
-  public List<String> getMetricNames() {
-    return metricNames == null || metricNames.isEmpty() ? null : metricNames;
-  }
-
-  public StringBuilder getConditionClause() {
-    StringBuilder sb = new StringBuilder();
-    boolean appendConjunction = appendUuidClause(sb);
-    appendConjunction = append(sb, appendConjunction, getStartTime(), " 
SERVER_TIME >= ?");
-    append(sb, appendConjunction, getEndTime(), " SERVER_TIME < ?");
-
-    return sb;
-  }
-
-  protected static boolean append(StringBuilder sb,
-                                  boolean appendConjunction,
-                                  Object value, String str) {
-    if (value != null) {
-      if (appendConjunction) {
-        sb.append(" AND");
-      }
-
-      sb.append(str);
-      appendConjunction = true;
-    }
-    return appendConjunction;
-  }
-
-  public List<String> getHostnames() {
-    return hostnames;
-  }
-
-  public Precision getPrecision() {
-    return precision;
-  }
-
-  public void setPrecision(Precision precision) {
-    this.precision = precision;
-  }
-
-  public String getAppId() {
-    if (appId != null && !appId.isEmpty()) {
-      if (!(appId.equals("HOST") || appId.equals("FLUME_HANDLER"))) {
-        return appId.toLowerCase();
-      } else {
-        return appId;
-      }
-    }
-    return null;
-  }
-
-  public String getInstanceId() {
-    return instanceId == null || instanceId.isEmpty() ? null : instanceId;
-  }
-
-  /**
-   * Convert to millis.
-   */
-  public Long getStartTime() {
-    if (startTime == null) {
-      return null;
-    } else if (startTime < 9999999999l) {
-      return startTime * 1000;
-    } else {
-      return startTime;
-    }
-  }
-
-  public Long getEndTime() {
-    if (endTime == null) {
-      return null;
-    }
-    if (endTime < 9999999999l) {
-      return endTime * 1000;
-    } else {
-      return endTime;
-    }
-  }
-
-  public void setNoLimit() {
-    this.noLimit = true;
-  }
-
-  @Override
-  public boolean doUpdate() {
-    return false;
-  }
-
-  public Integer getLimit() {
-    if (noLimit) {
-      return null;
-    }
-    return limit == null ? PhoenixHBaseAccessor.RESULTSET_LIMIT : limit;
-  }
-
-  public boolean isGrouped() {
-    return grouped;
-  }
-
-  public boolean isPointInTime() {
-    return getStartTime() == null && getEndTime() == null;
-  }
-
-  public boolean isEmpty() {
-    return (metricNames == null || metricNames.isEmpty())
-      && (hostnames == null || hostnames.isEmpty())
-      && (appId == null || appId.isEmpty())
-      && (instanceId == null || instanceId.isEmpty())
-      && startTime == null
-      && endTime == null;
-  }
-
-  public Integer getFetchSize() {
-    return fetchSize;
-  }
-
-  public void setFetchSize(Integer fetchSize) {
-    this.fetchSize = fetchSize;
-  }
-
-  public void addOrderByColumn(String column) {
-    orderByColumns.add(column);
-  }
-
-  public String getOrderByClause(boolean asc) {
-    String orderByStr = " ORDER BY ";
-    if (!orderByColumns.isEmpty()) {
-      StringBuilder sb = new StringBuilder(orderByStr);
-      for (String orderByColumn : orderByColumns) {
-        if (sb.length() != orderByStr.length()) {
-          sb.append(", ");
-        }
-        sb.append(orderByColumn);
-        if (!asc) {
-          sb.append(" DESC");
-        }
-      }
-      sb.append(" ");
-      return sb.toString();
-    }
-    return null;
-  }
-
-  protected boolean appendUuidClause(StringBuilder sb) {
-    boolean appendConjunction = false;
-
-    if (CollectionUtils.isNotEmpty(uuids)) {
-
-      List<byte[]> uuidsHost = new ArrayList<>();
-      List<byte[]> uuidsMetric = new ArrayList<>();
-      List<byte[]> uuidsFull = new ArrayList<>();
-
-      if (getUuids() != null) {
-        for (byte[] uuid : uuids) {
-          if (uuid.length == 
TimelineMetricMetadataManager.TIMELINE_METRIC_UUID_LENGTH) {
-            uuidsMetric.add(uuid);
-          } else if (uuid.length == 
TimelineMetricMetadataManager.HOSTNAME_UUID_LENGTH) {
-            uuidsHost.add(uuid);
-          } else {
-            uuidsFull.add(uuid);
-          }
-        }
-
-        // Put a '(' first
-        sb.append("(");
-
-        //IN clause
-        // METRIC_NAME (NOT) IN (?,?,?,?)
-        if (CollectionUtils.isNotEmpty(uuidsFull)) {
-          sb.append("UUID");
-          if (uuidNotCondition) {
-            sb.append(" NOT");
-          }
-          sb.append(" IN (");
-          //Append ?,?,?,?
-          for (int i = 0; i < uuidsFull.size(); i++) {
-            sb.append("?");
-            if (i < uuidsFull.size() - 1) {
-              sb.append(", ");
-            }
-          }
-          sb.append(")");
-          appendConjunction = true;
-        }
-
-        //Put an AND if both types are present
-        if (CollectionUtils.isNotEmpty(uuidsFull) &&
-          CollectionUtils.isNotEmpty(uuidsMetric)) {
-          sb.append(" AND ");
-        }
-
-        // ( for OR
-        if (!metricNamesNotCondition && uuidsMetric.size() > 1 && 
(CollectionUtils.isNotEmpty(uuidsFull) || 
CollectionUtils.isNotEmpty(uuidsHost))) {
-          sb.append("(");
-        }
-
-        //LIKE clause for clusterMetric UUIDs
-        // UUID (NOT) LIKE ? OR(AND) UUID LIKE ?
-        if (CollectionUtils.isNotEmpty(uuidsMetric)) {
-
-          for (int i = 0; i < uuidsMetric.size(); i++) {
-            sb.append("UUID");
-            if (metricNamesNotCondition) {
-              sb.append(" NOT");
-            }
-            sb.append(" LIKE ");
-            sb.append("?");
-
-            if (i < uuidsMetric.size() - 1) {
-              if (metricNamesNotCondition) {
-                sb.append(" AND ");
-              } else {
-                sb.append(" OR ");
-              }
-            // ) for OR
-            } else if ((CollectionUtils.isNotEmpty(uuidsFull) || 
CollectionUtils.isNotEmpty(uuidsHost)) && !metricNamesNotCondition && 
uuidsMetric.size() > 1) {
-              sb.append(")");
-            }
-          }
-          appendConjunction = true;
-        }
-
-        //Put an AND if both types are present
-        if ((CollectionUtils.isNotEmpty(uuidsMetric) || 
(CollectionUtils.isNotEmpty(uuidsFull) && CollectionUtils.isEmpty(uuidsMetric)))
-          && CollectionUtils.isNotEmpty(uuidsHost)) {
-          sb.append(" AND ");
-        }
-        // ( for OR
-        if((CollectionUtils.isNotEmpty(uuidsFull) || 
CollectionUtils.isNotEmpty(uuidsMetric)) && !hostNamesNotCondition && 
uuidsHost.size() > 1){
-          sb.append("(");
-        }
-
-        //LIKE clause for HOST UUIDs
-        // UUID (NOT) LIKE ? OR(AND) UUID LIKE ?
-        if (CollectionUtils.isNotEmpty(uuidsHost)) {
-
-          for (int i = 0; i < uuidsHost.size(); i++) {
-            sb.append("UUID");
-            if (hostNamesNotCondition) {
-              sb.append(" NOT");
-            }
-            sb.append(" LIKE ");
-            sb.append("?");
-
-            if (i < uuidsHost.size() - 1) {
-              if (hostNamesNotCondition) {
-                sb.append(" AND ");
-              } else {
-                sb.append(" OR ");
-              }
-            // ) for OR
-            } else if ((CollectionUtils.isNotEmpty(uuidsFull) || 
CollectionUtils.isNotEmpty(uuidsMetric)) && !hostNamesNotCondition && 
uuidsHost.size() > 1) {
-              sb.append(")");
-            }
-          }
-          appendConjunction = true;
-        }
-
-        // Finish with a ')'
-        if (appendConjunction) {
-          sb.append(")");
-        }
-
-        uuids = new ArrayList<>();
-        if (CollectionUtils.isNotEmpty(uuidsFull)) {
-          uuids.addAll(uuidsFull);
-        }
-        for (byte[] uuid: uuidsMetric) {
-          uuids.add(new String(uuid).concat("%").getBytes());
-        }
-        for (byte[] uuid: uuidsHost) {
-          uuids.add("%".concat(new String(uuid)).getBytes());
-        }
-      }
-    }
-
-    return appendConjunction;
-  }
-
-  @Override
-  public String toString() {
-    return "Condition{" +
-      "uuids=" + uuids +
-      ", appId='" + appId + '\'' +
-      ", instanceId='" + instanceId + '\'' +
-      ", startTime=" + startTime +
-      ", endTime=" + endTime +
-      ", limit=" + limit +
-      ", grouped=" + grouped +
-      ", orderBy=" + orderByColumns +
-      ", noLimit=" + noLimit +
-      '}';
-  }
-
-  protected static boolean metricNamesHaveWildcard(List<String> metricNames) {
-    for (String name : metricNames) {
-      if (name.contains("%")) {
-        return true;
-      }
-    }
-    return false;
-  }
-
-  protected static boolean hostNamesHaveWildcard(List<String> hostnames) {
-    if (hostnames == null)
-      return false;
-    for (String name : hostnames) {
-      if (name.contains("%")) {
-        return true;
-      }
-    }
-    return false;
-  }
-
-  public void setMetricNamesNotCondition(boolean metricNamesNotCondition) {
-    this.metricNamesNotCondition = metricNamesNotCondition;
-  }
-
-  @Override
-  public void setHostnamesNotCondition(boolean hostNamesNotCondition) {
-    this.hostNamesNotCondition = hostNamesNotCondition;
-  }
-
-  @Override
-  public void setUuidNotCondition(boolean uuidNotCondition) {
-    this.uuidNotCondition = uuidNotCondition;
-  }
-
-  @Override
-  public List<byte[]> getUuids() {
-    return uuids;
-  }
-}

http://git-wip-us.apache.org/repos/asf/ambari/blob/42112e28/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/query/DefaultPhoenixDataSource.java
----------------------------------------------------------------------
diff --git 
a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/query/DefaultPhoenixDataSource.java
 
b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/query/DefaultPhoenixDataSource.java
deleted file mode 100644
index 78fad62..0000000
--- 
a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/query/DefaultPhoenixDataSource.java
+++ /dev/null
@@ -1,90 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query;
-
-
-import java.io.IOException;
-import java.sql.Connection;
-import java.sql.DriverManager;
-import java.sql.SQLException;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.client.ConnectionFactory;
-import org.apache.hadoop.hbase.client.HBaseAdmin;
-
-public class DefaultPhoenixDataSource implements PhoenixConnectionProvider {
-
-  static final Log LOG = LogFactory.getLog(DefaultPhoenixDataSource.class);
-  private static final String ZOOKEEPER_CLIENT_PORT = 
"hbase.zookeeper.property.clientPort";
-  private static final String ZOOKEEPER_QUORUM = "hbase.zookeeper.quorum";
-  private static final String ZNODE_PARENT = "zookeeper.znode.parent";
-
-  private static final String connectionUrl = "jdbc:phoenix:%s:%s:%s";
-  private final String url;
-
-  private Configuration hbaseConf;
-
-  public DefaultPhoenixDataSource(Configuration hbaseConf) {
-    this.hbaseConf = hbaseConf;
-    String zookeeperClientPort = hbaseConf.getTrimmed(ZOOKEEPER_CLIENT_PORT, 
"2181");
-    String zookeeperQuorum = hbaseConf.getTrimmed(ZOOKEEPER_QUORUM);
-    String znodeParent = hbaseConf.getTrimmed(ZNODE_PARENT, 
"/ams-hbase-unsecure");
-    if (zookeeperQuorum == null || zookeeperQuorum.isEmpty()) {
-      throw new IllegalStateException("Unable to find Zookeeper quorum to " +
-        "access HBase store using Phoenix.");
-    }
-
-    url = String.format(connectionUrl,
-      zookeeperQuorum,
-      zookeeperClientPort,
-      znodeParent);
-  }
-
-  /**
-   * Get HBaseAdmin for table ops.
-   * @return @HBaseAdmin
-   * @throws IOException
-   */
-  public HBaseAdmin getHBaseAdmin() throws IOException {
-    return (HBaseAdmin) 
ConnectionFactory.createConnection(hbaseConf).getAdmin();
-  }
-
-  /**
-   * Get JDBC connection to HBase store. Assumption is that the hbase
-   * configuration is present on the classpath and loaded by the caller into
-   * the Configuration object.
-   * Phoenix already caches the HConnection between the client and HBase
-   * cluster.
-   *
-   * @return @java.sql.Connection
-   */
-  public Connection getConnection() throws SQLException {
-
-    LOG.debug("Metric store connection url: " + url);
-    try {
-      return DriverManager.getConnection(url);
-    } catch (SQLException e) {
-      LOG.warn("Unable to connect to HBase store using Phoenix.", e);
-
-      throw e;
-    }
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/ambari/blob/42112e28/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/query/EmptyCondition.java
----------------------------------------------------------------------
diff --git 
a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/query/EmptyCondition.java
 
b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/query/EmptyCondition.java
deleted file mode 100644
index 5d1e244..0000000
--- 
a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/query/EmptyCondition.java
+++ /dev/null
@@ -1,169 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query;
-
-import java.util.List;
-
-import org.apache.commons.lang.NotImplementedException;
-import org.apache.hadoop.metrics2.sink.timeline.Precision;
-
-/**
- * Encapsulate a Condition with pre-formatted and pre-parsed query string.
- */
-public class EmptyCondition implements Condition {
-  String statement;
-  boolean doUpdate = false;
-  boolean metricNamesNotCondition = false;
-
-  @Override
-  public boolean isEmpty() {
-    return false;
-  }
-
-  @Override
-  public List<byte[]> getUuids() {
-    return null;
-  }
-
-  @Override
-  public List<String> getMetricNames() {
-    return null;
-  }
-
-  @Override
-  public boolean isPointInTime() {
-    return false;
-  }
-
-  @Override
-  public boolean isGrouped() {
-    return true;
-  }
-
-  @Override
-  public void setStatement(String statement) {
-    this.statement = statement;
-  }
-
-  @Override
-  public List<String> getHostnames() {
-    return null;
-  }
-
-  @Override
-  public Precision getPrecision() {
-    return null;
-  }
-
-  @Override
-  public void setPrecision(Precision precision) {
-
-  }
-
-  @Override
-  public String getAppId() {
-    return null;
-  }
-
-  @Override
-  public String getInstanceId() {
-    return null;
-  }
-
-  @Override
-  public StringBuilder getConditionClause() {
-    return null;
-  }
-
-  @Override
-  public String getOrderByClause(boolean asc) {
-    return null;
-  }
-
-  @Override
-  public String getStatement() {
-    return statement;
-  }
-
-  @Override
-  public Long getStartTime() {
-    return null;
-  }
-
-  @Override
-  public Long getEndTime() {
-    return null;
-  }
-
-  @Override
-  public Integer getLimit() {
-    return null;
-  }
-
-  @Override
-  public Integer getFetchSize() {
-    return null;
-  }
-
-  @Override
-  public void setFetchSize(Integer fetchSize) {
-
-  }
-
-  @Override
-  public void addOrderByColumn(String column) {
-
-  }
-
-  @Override
-  public void setNoLimit() {
-
-  }
-
-  public void setDoUpdate(boolean doUpdate) {
-    this.doUpdate = doUpdate;
-  }
-
-  @Override
-  public boolean doUpdate() {
-    return doUpdate;
-  }
-
-  @Override
-  public String toString() {
-    return "EmptyCondition{ " +
-      " statement = " + this.getStatement() +
-      " doUpdate = " + this.doUpdate() +
-      " }";
-  }
-
-  @Override
-  public void setMetricNamesNotCondition(boolean metricNamesNotCondition) {
-    this.metricNamesNotCondition = metricNamesNotCondition;
-  }
-
-  @Override
-  public void setHostnamesNotCondition(boolean hostNamesNotCondition) {
-    throw new NotImplementedException("Not implemented");
-  }
-
-  @Override
-  public void setUuidNotCondition(boolean uuidNotCondition) {
-    throw new NotImplementedException("Not implemented");
-  }
-}

http://git-wip-us.apache.org/repos/asf/ambari/blob/42112e28/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/query/PhoenixConnectionProvider.java
----------------------------------------------------------------------
diff --git 
a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/query/PhoenixConnectionProvider.java
 
b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/query/PhoenixConnectionProvider.java
deleted file mode 100644
index cdb3b4e..0000000
--- 
a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/query/PhoenixConnectionProvider.java
+++ /dev/null
@@ -1,31 +0,0 @@
-package 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query;
-
-import java.io.IOException;
-
-import org.apache.hadoop.hbase.client.Admin;
-
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-public interface PhoenixConnectionProvider extends ConnectionProvider {
-  /**
-   * Get HBaseAdmin for the Phoenix connection
-   * @return
-   * @throws IOException
-   */
-  Admin getHBaseAdmin() throws IOException;
-}

http://git-wip-us.apache.org/repos/asf/ambari/blob/42112e28/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/query/PhoenixTransactSQL.java
----------------------------------------------------------------------
diff --git 
a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/query/PhoenixTransactSQL.java
 
b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/query/PhoenixTransactSQL.java
deleted file mode 100644
index 457f5af..0000000
--- 
a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/query/PhoenixTransactSQL.java
+++ /dev/null
@@ -1,804 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query;
-
-import java.sql.Connection;
-import java.sql.PreparedStatement;
-import java.sql.SQLException;
-import java.util.List;
-import java.util.concurrent.TimeUnit;
-import java.util.regex.Pattern;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.metrics2.sink.timeline.Precision;
-import 
org.apache.hadoop.metrics2.sink.timeline.PrecisionLimitExceededException;
-import 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixHBaseAccessor;
-import 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.discovery.TimelineMetricMetadataManager;
-
-/**
- * Encapsulate all metrics related SQL queries.
- */
-public class PhoenixTransactSQL {
-
-  public static final Log LOG = LogFactory.getLog(PhoenixTransactSQL.class);
-
-  /**
-   * Create table to store individual metric records.
-   */
-  public static final String CREATE_METRICS_TABLE_SQL = "CREATE TABLE IF NOT " 
+
-    "EXISTS METRIC_RECORD (UUID BINARY(20) NOT NULL, " +
-    "SERVER_TIME BIGINT NOT NULL, " +
-    "METRIC_SUM DOUBLE, " +
-    "METRIC_COUNT UNSIGNED_INT, " +
-    "METRIC_MAX DOUBLE, " +
-    "METRIC_MIN DOUBLE, " +
-    "METRICS VARCHAR CONSTRAINT pk " +
-    "PRIMARY KEY (UUID, SERVER_TIME ROW_TIMESTAMP)) DATA_BLOCK_ENCODING='%s', 
IMMUTABLE_ROWS=true, " +
-    "TTL=%s, COMPRESSION='%s'";
-
-  public static final String CREATE_CONTAINER_METRICS_TABLE_SQL =
-      "CREATE TABLE IF NOT EXISTS CONTAINER_METRICS "
-      + "(APP_ID VARCHAR, "
-      + " CONTAINER_ID VARCHAR,"
-      + " START_TIME TIMESTAMP,"
-      + " FINISH_TIME TIMESTAMP, "
-      + " DURATION BIGINT,"
-      + " HOSTNAME VARCHAR,"
-      + " EXIT_CODE INTEGER,"
-      + " LOCALIZATION_DURATION BIGINT,"
-      + " LAUNCH_DURATION BIGINT,"
-      + " MEM_REQUESTED_GB DOUBLE,"
-      + " MEM_REQUESTED_GB_MILLIS DOUBLE,"
-      + " MEM_VIRTUAL_GB DOUBLE,"
-      + " MEM_USED_GB_MIN DOUBLE,"
-      + " MEM_USED_GB_MAX DOUBLE,"
-      + " MEM_USED_GB_AVG DOUBLE,"
-      + " MEM_USED_GB_50_PCT DOUBLE,"
-      + " MEM_USED_GB_75_PCT DOUBLE,"
-      + " MEM_USED_GB_90_PCT DOUBLE,"
-      + " MEM_USED_GB_95_PCT DOUBLE,"
-      + " MEM_USED_GB_99_PCT DOUBLE,"
-      + " MEM_UNUSED_GB DOUBLE,"
-      + " MEM_UNUSED_GB_MILLIS DOUBLE "
-      + " CONSTRAINT pk PRIMARY KEY(APP_ID, CONTAINER_ID)) 
DATA_BLOCK_ENCODING='%s',"
-      + " IMMUTABLE_ROWS=true, TTL=%s, COMPRESSION='%s'";
-
-  public static final String CREATE_METRICS_AGGREGATE_TABLE_SQL =
-    "CREATE TABLE IF NOT EXISTS %s " +
-      "(UUID BINARY(20) NOT NULL, " +
-      "SERVER_TIME BIGINT NOT NULL, " +
-      "METRIC_SUM DOUBLE," +
-      "METRIC_COUNT UNSIGNED_INT, " +
-      "METRIC_MAX DOUBLE," +
-      "METRIC_MIN DOUBLE CONSTRAINT pk " +
-      "PRIMARY KEY (UUID, SERVER_TIME ROW_TIMESTAMP)) 
DATA_BLOCK_ENCODING='%s', IMMUTABLE_ROWS=true, TTL=%s," +
-      " COMPRESSION='%s'";
-
-  public static final String CREATE_METRICS_CLUSTER_AGGREGATE_TABLE_SQL =
-    "CREATE TABLE IF NOT EXISTS %s " +
-      "(UUID BINARY(16) NOT NULL, " +
-      "SERVER_TIME BIGINT NOT NULL, " +
-      "METRIC_SUM DOUBLE, " +
-      "HOSTS_COUNT UNSIGNED_INT, " +
-      "METRIC_MAX DOUBLE, " +
-      "METRIC_MIN DOUBLE " +
-      "CONSTRAINT pk PRIMARY KEY (UUID, SERVER_TIME ROW_TIMESTAMP)) 
DATA_BLOCK_ENCODING='%s', IMMUTABLE_ROWS=true, " +
-      "TTL=%s, COMPRESSION='%s'";
-
-  // HOSTS_COUNT vs METRIC_COUNT
-  public static final String 
CREATE_METRICS_CLUSTER_AGGREGATE_GROUPED_TABLE_SQL =
-    "CREATE TABLE IF NOT EXISTS %s " +
-      "(UUID BINARY(16) NOT NULL, " +
-      "SERVER_TIME BIGINT NOT NULL, " +
-      "METRIC_SUM DOUBLE, " +
-      "METRIC_COUNT UNSIGNED_INT, " +
-      "METRIC_MAX DOUBLE, " +
-      "METRIC_MIN DOUBLE " +
-      "CONSTRAINT pk PRIMARY KEY (UUID, SERVER_TIME ROW_TIMESTAMP)) 
DATA_BLOCK_ENCODING='%s', IMMUTABLE_ROWS=true, " +
-      "TTL=%s, COMPRESSION='%s'";
-
-  public static final String CREATE_METRICS_METADATA_TABLE_SQL =
-    "CREATE TABLE IF NOT EXISTS METRICS_METADATA " +
-      "(METRIC_NAME VARCHAR, " +
-      "APP_ID VARCHAR, " +
-      "INSTANCE_ID VARCHAR, " +
-      "UUID BINARY(16), " +
-      "UNITS CHAR(20), " +
-      "TYPE CHAR(20), " +
-      "START_TIME UNSIGNED_LONG, " +
-      "SUPPORTS_AGGREGATION BOOLEAN, " +
-      "IS_WHITELISTED BOOLEAN " +
-      "CONSTRAINT pk PRIMARY KEY (METRIC_NAME, APP_ID)) " +
-      "DATA_BLOCK_ENCODING='%s', COMPRESSION='%s'";
-
-  public static final String CREATE_HOSTED_APPS_METADATA_TABLE_SQL =
-    "CREATE TABLE IF NOT EXISTS HOSTED_APPS_METADATA " +
-      "(HOSTNAME VARCHAR, UUID BINARY(4), APP_IDS VARCHAR, " +
-      "CONSTRAINT pk PRIMARY KEY (HOSTNAME))" +
-      "DATA_BLOCK_ENCODING='%s', COMPRESSION='%s'";
-
-  public static final String CREATE_INSTANCE_HOST_TABLE_SQL =
-    "CREATE TABLE IF NOT EXISTS INSTANCE_HOST_METADATA " +
-      "(INSTANCE_ID VARCHAR, HOSTNAME VARCHAR, " +
-      "CONSTRAINT pk PRIMARY KEY (INSTANCE_ID, HOSTNAME))" +
-      "DATA_BLOCK_ENCODING='%s', COMPRESSION='%s'";
-
-  public static final String ALTER_METRICS_METADATA_TABLE =
-    "ALTER TABLE METRICS_METADATA ADD IF NOT EXISTS IS_WHITELISTED BOOLEAN";
-
-  /**
-   * ALTER table to set new options
-   */
-  public static final String ALTER_SQL = "ALTER TABLE %s SET TTL=%s";
-
-  /**
-   * Insert into metric records table.
-   */
-  public static final String UPSERT_METRICS_SQL = "UPSERT INTO %s " +
-    "(UUID, " +
-    "SERVER_TIME, " +
-    "METRIC_SUM, " +
-    "METRIC_MAX, " +
-    "METRIC_MIN, " +
-    "METRIC_COUNT, " +
-    "METRICS) VALUES " +
-    "(?, ?, ?, ?, ?, ?, ?)";
-
-  public static final String UPSERT_CONTAINER_METRICS_SQL = "UPSERT INTO %s " +
-      "(APP_ID,"
-      + " CONTAINER_ID,"
-      + " START_TIME,"
-      + " FINISH_TIME,"
-      + " DURATION,"
-      + " HOSTNAME,"
-      + " EXIT_CODE,"
-      + " LOCALIZATION_DURATION,"
-      + " LAUNCH_DURATION,"
-      + " MEM_REQUESTED_GB,"
-      + " MEM_REQUESTED_GB_MILLIS,"
-      + " MEM_VIRTUAL_GB,"
-      + " MEM_USED_GB_MIN,"
-      + " MEM_USED_GB_MAX,"
-      + " MEM_USED_GB_AVG,"
-      + " MEM_USED_GB_50_PCT,"
-      + " MEM_USED_GB_75_PCT,"
-      + " MEM_USED_GB_90_PCT,"
-      + " MEM_USED_GB_95_PCT,"
-      + " MEM_USED_GB_99_PCT,"
-      + " MEM_UNUSED_GB,"
-      + " MEM_UNUSED_GB_MILLIS) VALUES " +
-      "(?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)";
-
-  public static final String UPSERT_CLUSTER_AGGREGATE_SQL = "UPSERT INTO " +
-    "%s (UUID, " +
-    "SERVER_TIME, " +
-    "METRIC_SUM, " +
-    "HOSTS_COUNT, " +
-    "METRIC_MAX, " +
-    "METRIC_MIN) " +
-    "VALUES (?, ?, ?, ?, ?, ?)";
-
-  public static final String UPSERT_CLUSTER_AGGREGATE_TIME_SQL = "UPSERT INTO" 
+
-    " %s (UUID, SERVER_TIME, " +
-    "METRIC_SUM, " +
-    "METRIC_COUNT, " +
-    "METRIC_MAX, " +
-    "METRIC_MIN) " +
-    "VALUES (?, ?, ?, ?, ?, ?)";
-
-  public static final String UPSERT_AGGREGATE_RECORD_SQL = "UPSERT INTO " +
-    "%s (UUID, " +
-    "SERVER_TIME, " +
-    "METRIC_SUM, " +
-    "METRIC_MAX, " +
-    "METRIC_MIN," +
-    "METRIC_COUNT) " +
-    "VALUES (?, ?, ?, ?, ?, ?)";
-
-  public static final String UPSERT_METADATA_SQL =
-    "UPSERT INTO METRICS_METADATA (METRIC_NAME, APP_ID, INSTANCE_ID, UUID, 
UNITS, TYPE, " +
-      "START_TIME, SUPPORTS_AGGREGATION, IS_WHITELISTED) " +
-      "VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)";
-
-  public static final String UPSERT_HOSTED_APPS_METADATA_SQL =
-    "UPSERT INTO HOSTED_APPS_METADATA (HOSTNAME, UUID, APP_IDS) VALUES (?, ?, 
?)";
-
-  public static final String UPSERT_INSTANCE_HOST_METADATA_SQL =
-    "UPSERT INTO INSTANCE_HOST_METADATA (INSTANCE_ID, HOSTNAME) VALUES (?, ?)";
-
-  /**
-   * Retrieve a set of rows from metrics records table.
-   */
-  public static final String GET_METRIC_SQL = "SELECT UUID, SERVER_TIME, " +
-    "METRIC_SUM, " +
-    "METRIC_MAX, " +
-    "METRIC_MIN, " +
-    "METRIC_COUNT, " +
-    "METRICS " +
-    "FROM %s";
-
-  /**
-   * Get latest metrics for a number of hosts
-   *
-   * Different queries for a number and a single hosts are used due to bug
-   * in Apache Phoenix
-   */
-  public static final String GET_LATEST_METRIC_SQL = "SELECT %s E.UUID AS 
UUID, " +
-    "E.SERVER_TIME AS SERVER_TIME, " +
-    "E.METRIC_SUM AS METRIC_SUM, " +
-    "E.METRIC_MAX AS METRIC_MAX, E.METRIC_MIN AS METRIC_MIN, " +
-    "E.METRIC_COUNT AS METRIC_COUNT, E.METRICS AS METRICS " +
-    "FROM %s AS E " +
-    "INNER JOIN " +
-    "(SELECT UUID, MAX(SERVER_TIME) AS MAX_SERVER_TIME " +
-    "FROM %s " +
-    "WHERE " +
-    "%s " +
-    "GROUP BY UUID) " +
-    "AS I " +
-    "ON E.UUID=I.UUID " +
-    "AND E.SERVER_TIME=I.MAX_SERVER_TIME";
-
-  public static final String GET_METRIC_AGGREGATE_ONLY_SQL = "SELECT UUID, " +
-    "SERVER_TIME, " +
-    "METRIC_SUM, " +
-    "METRIC_MAX, " +
-    "METRIC_MIN, " +
-    "METRIC_COUNT " +
-    "FROM %s";
-
-  public static final String GET_CLUSTER_AGGREGATE_SQL = "SELECT " +
-    "UUID, " +
-    "SERVER_TIME, " +
-    "METRIC_SUM, " +
-    "HOSTS_COUNT, " +
-    "METRIC_MAX, " +
-    "METRIC_MIN " +
-    "FROM %s";
-
-  public static final String GET_CLUSTER_AGGREGATE_TIME_SQL = "SELECT " +
-    "UUID, " +
-    "SERVER_TIME, " +
-    "METRIC_SUM, " +
-    "METRIC_COUNT, " +
-    "METRIC_MAX, " +
-    "METRIC_MIN " +
-    "FROM %s";
-
-  public static final String TOP_N_INNER_SQL = "SELECT UUID " +
-    "FROM %s WHERE %s GROUP BY UUID ORDER BY %s LIMIT %s";
-
-  public static final String GET_METRIC_METADATA_SQL = "SELECT " +
-    "METRIC_NAME, APP_ID, INSTANCE_ID, UUID, UNITS, TYPE, START_TIME, " +
-    "SUPPORTS_AGGREGATION, IS_WHITELISTED FROM METRICS_METADATA";
-
-  public static final String GET_HOSTED_APPS_METADATA_SQL = "SELECT " +
-    "HOSTNAME, UUID, APP_IDS FROM HOSTED_APPS_METADATA";
-
-  public static final String GET_INSTANCE_HOST_METADATA_SQL = "SELECT " +
-    "INSTANCE_ID, HOSTNAME FROM INSTANCE_HOST_METADATA";
-
-  /**
-   * Aggregate host metrics using a GROUP BY clause to take advantage of
-   * N - way parallel scan where N = number of regions.
-   */
-  public static final String GET_AGGREGATED_HOST_METRIC_GROUPBY_SQL = "UPSERT 
" +
-    "INTO %s (UUID, SERVER_TIME, METRIC_SUM, METRIC_COUNT, METRIC_MAX, 
METRIC_MIN) " +
-    "SELECT UUID, %s AS SERVER_TIME, " +
-    "SUM(METRIC_SUM), SUM(METRIC_COUNT), MAX(METRIC_MAX), MIN(METRIC_MIN) " +
-    "FROM %s WHERE%s SERVER_TIME > %s AND SERVER_TIME <= %s " +
-    "GROUP BY UUID";
-
-  /**
-   * Downsample host metrics.
-   */
-  public static final String DOWNSAMPLE_HOST_METRIC_SQL_UPSERT_PREFIX = 
"UPSERT INTO %s (UUID, SERVER_TIME, " +
-    "METRIC_SUM, METRIC_COUNT, METRIC_MAX, METRIC_MIN) ";
-
-  public static final String TOPN_DOWNSAMPLER_HOST_METRIC_SELECT_SQL = "SELECT 
UUID, " +
-    "%s AS SERVER_TIME, %s, 1, %s, %s FROM %s WHERE UUID IN %s AND SERVER_TIME 
> %s AND SERVER_TIME <= %s " +
-    "GROUP BY UUID ORDER BY %s DESC LIMIT %s";
-
-  /**
-   * Aggregate app metrics using a GROUP BY clause to take advantage of
-   * N - way parallel scan where N = number of regions.
-   */
-  public static final String GET_AGGREGATED_APP_METRIC_GROUPBY_SQL = "UPSERT " 
+
-         "INTO %s (UUID, SERVER_TIME, METRIC_SUM, METRIC_COUNT, METRIC_MAX, 
METRIC_MIN) SELECT UUID, %s AS SERVER_TIME, " +
-         "ROUND(AVG(METRIC_SUM),2), ROUND(AVG(%s)), MAX(METRIC_MAX), 
MIN(METRIC_MIN) FROM %s WHERE%s SERVER_TIME > %s AND " +
-         "SERVER_TIME <= %s GROUP BY UUID";
-
-  /**
-   * Downsample cluster metrics.
-   */
-  public static final String DOWNSAMPLE_CLUSTER_METRIC_SQL_UPSERT_PREFIX = 
"UPSERT INTO %s (UUID, SERVER_TIME, " +
-    "METRIC_SUM, METRIC_COUNT, METRIC_MAX, METRIC_MIN) ";
-
-  public static final String TOPN_DOWNSAMPLER_CLUSTER_METRIC_SELECT_SQL = 
"SELECT UUID, " +
-    "%s AS SERVER_TIME, %s, 1, %s, %s FROM %s WHERE UUID IN %s AND SERVER_TIME 
> %s AND SERVER_TIME <= %s " +
-    "GROUP BY UUID ORDER BY %s DESC LIMIT %s";
-
-  public static final String METRICS_RECORD_TABLE_NAME = "METRIC_RECORD";
-
-  public static final String CONTAINER_METRICS_TABLE_NAME = 
"CONTAINER_METRICS";
-
-  public static final String METRICS_AGGREGATE_MINUTE_TABLE_NAME =
-    "METRIC_RECORD_MINUTE";
-  public static final String METRICS_AGGREGATE_HOURLY_TABLE_NAME =
-    "METRIC_RECORD_HOURLY";
-  public static final String METRICS_AGGREGATE_DAILY_TABLE_NAME =
-    "METRIC_RECORD_DAILY";
-  public static final String METRICS_CLUSTER_AGGREGATE_TABLE_NAME =
-    "METRIC_AGGREGATE";
-  public static final String METRICS_CLUSTER_AGGREGATE_MINUTE_TABLE_NAME =
-    "METRIC_AGGREGATE_MINUTE";
-  public static final String METRICS_CLUSTER_AGGREGATE_HOURLY_TABLE_NAME =
-    "METRIC_AGGREGATE_HOURLY";
-  public static final String METRICS_CLUSTER_AGGREGATE_DAILY_TABLE_NAME =
-    "METRIC_AGGREGATE_DAILY";
-
-  public static final Pattern PHOENIX_TABLES_REGEX_PATTERN = 
Pattern.compile("METRIC_");
-
-  public static final String[] PHOENIX_TABLES = {
-    METRICS_RECORD_TABLE_NAME,
-    METRICS_AGGREGATE_MINUTE_TABLE_NAME,
-    METRICS_AGGREGATE_HOURLY_TABLE_NAME,
-    METRICS_AGGREGATE_DAILY_TABLE_NAME,
-    METRICS_CLUSTER_AGGREGATE_TABLE_NAME,
-    METRICS_CLUSTER_AGGREGATE_MINUTE_TABLE_NAME,
-    METRICS_CLUSTER_AGGREGATE_HOURLY_TABLE_NAME,
-    METRICS_CLUSTER_AGGREGATE_DAILY_TABLE_NAME
-  };
-
-  public static final String DEFAULT_TABLE_COMPRESSION = "SNAPPY";
-  public static final String DEFAULT_ENCODING = "FAST_DIFF";
-  public static final long HOUR = 3600000; // 1 hour
-  public static final long DAY = 86400000; // 1 day
-  private static boolean sortMergeJoinEnabled = false;
-
-  /**
-   * Filter to optimize HBase scan by using file timestamps. This prevents
-   * a full table scan of metric records.
-   *
-   * @return Phoenix Hint String
-   */
-  public static String getNaiveTimeRangeHint(Long startTime, Long delta) {
-    return String.format("/*+ NATIVE_TIME_RANGE(%s) */", (startTime - delta));
-  }
-
-  /**
-   * Falling back to sort merge join algorithm if default queries fail.
-   *
-   * @return Phoenix Hint String
-   */
-  public static String getLatestMetricsHints() {
-    if (sortMergeJoinEnabled) {
-      return "/*+ USE_SORT_MERGE_JOIN NO_CACHE */";
-    }
-    return "";
-  }
-
-  public static void setSortMergeJoinEnabled(boolean sortMergeJoinEnabled) {
-    PhoenixTransactSQL.sortMergeJoinEnabled = sortMergeJoinEnabled;
-  }
-
-  public static PreparedStatement prepareGetMetricsSqlStmt(Connection 
connection,
-                                                           Condition 
condition) throws SQLException {
-
-    validateConditionIsNotEmpty(condition);
-    validateRowCountLimit(condition);
-
-    String stmtStr;
-    if (condition.getStatement() != null) {
-      stmtStr = condition.getStatement();
-    } else {
-      String metricsTable;
-      String query;
-      if (condition.getPrecision() == null) {
-        long endTime = condition.getEndTime() == null ? 
System.currentTimeMillis() : condition.getEndTime();
-        long startTime = condition.getStartTime() == null ? 0 : 
condition.getStartTime();
-        Precision precision = Precision.getPrecision(startTime, endTime);
-        condition.setPrecision(precision);
-      }
-      switch (condition.getPrecision()) {
-        case DAYS:
-          metricsTable = METRICS_AGGREGATE_DAILY_TABLE_NAME;
-          query = GET_METRIC_AGGREGATE_ONLY_SQL;
-          break;
-        case HOURS:
-          metricsTable = METRICS_AGGREGATE_HOURLY_TABLE_NAME;
-          query = GET_METRIC_AGGREGATE_ONLY_SQL;
-          break;
-        case MINUTES:
-          metricsTable = METRICS_AGGREGATE_MINUTE_TABLE_NAME;
-          query = GET_METRIC_AGGREGATE_ONLY_SQL;
-          break;
-        default:
-          metricsTable = METRICS_RECORD_TABLE_NAME;
-          query = GET_METRIC_SQL;
-      }
-
-      stmtStr = String.format(query, metricsTable);
-    }
-
-    StringBuilder sb = new StringBuilder(stmtStr);
-
-    if (!(condition instanceof EmptyCondition)) {
-      sb.append(" WHERE ");
-      sb.append(condition.getConditionClause());
-      String orderByClause = condition.getOrderByClause(true);
-      if (orderByClause != null) {
-        sb.append(orderByClause);
-      } else {
-        sb.append(" ORDER BY UUID, SERVER_TIME ");
-      }
-    }
-
-    if (condition.getLimit() != null) {
-      sb.append(" LIMIT ").append(condition.getLimit());
-    }
-
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("SQL: " + sb.toString() + ", condition: " + condition);
-    }
-
-    PreparedStatement stmt = null;
-    try {
-      stmt = connection.prepareStatement(sb.toString());
-      int pos = 1;
-      pos = addUuids(condition, pos, stmt);
-
-      if (condition instanceof TopNCondition) {
-        pos = addStartTime(condition, pos, stmt);
-        pos = addEndTime(condition, pos, stmt);
-      }
-
-      pos = addStartTime(condition, pos, stmt);
-      addEndTime(condition, pos, stmt);
-
-      if (condition.getFetchSize() != null) {
-        stmt.setFetchSize(condition.getFetchSize());
-      }
-    } catch (SQLException e) {
-      if (stmt != null) {
-        stmt.close();
-      }
-      throw e;
-    }
-
-    if (condition instanceof TopNCondition) {
-      LOG.info(sb.toString());
-    }
-    return stmt;
-  }
-
-  private static void validateConditionIsNotEmpty(Condition condition) {
-    if (condition.isEmpty()) {
-      throw new IllegalArgumentException("Condition is empty.");
-    }
-  }
-
-  private static void validateRowCountLimit(Condition condition) {
-    if (condition.getMetricNames() == null
-      || condition.getMetricNames().isEmpty()) {
-      //aggregator can use empty metrics query
-      return;
-    }
-
-    long range = condition.getEndTime() - condition.getStartTime();
-    long rowsPerMetric;
-
-    //Get Precision (passed in or computed) and estimate values returned based 
on that.
-    Precision precision = condition.getPrecision();
-    if (precision == null) {
-      precision = Precision.getPrecision(condition.getStartTime(), 
condition.getEndTime());
-    }
-
-    switch (precision) {
-      case DAYS:
-        rowsPerMetric = TimeUnit.MILLISECONDS.toDays(range);
-        break;
-      case HOURS:
-        rowsPerMetric = TimeUnit.MILLISECONDS.toHours(range);
-        break;
-      case MINUTES:
-        rowsPerMetric = TimeUnit.MILLISECONDS.toMinutes(range)/5; //5 minute 
data in METRIC_AGGREGATE_MINUTE table.
-        break;
-      default:
-        rowsPerMetric = TimeUnit.MILLISECONDS.toSeconds(range)/10; //10 second 
data in METRIC_AGGREGATE table
-    }
-
-    List<String> hostNames = condition.getHostnames();
-    int numHosts = (hostNames == null || hostNames.isEmpty()) ? 1 : 
condition.getHostnames().size();
-
-    long totalRowsRequested = rowsPerMetric * 
condition.getMetricNames().size() * numHosts;
-
-    if (totalRowsRequested > PhoenixHBaseAccessor.RESULTSET_LIMIT) {
-      throw new PrecisionLimitExceededException("Requested " +  
condition.getMetricNames().size() + " metrics for "
-        + numHosts + " hosts in " + precision +  " precision for the time 
range of " + range/1000
-        + " seconds. Estimated resultset size of " + totalRowsRequested + " is 
greater than the limit of "
-        + PhoenixHBaseAccessor.RESULTSET_LIMIT + ". Request lower precision or 
fewer number of metrics or hosts." +
-        " Alternatively, increase the limit value through 
ams-site:timeline.metrics.service.default.result.limit config");
-    }
-  }
-
-  public static PreparedStatement prepareGetLatestMetricSqlStmt(
-    Connection connection, Condition condition) throws SQLException {
-
-    validateConditionIsNotEmpty(condition);
-
-    if (condition.getMetricNames() == null
-      || condition.getMetricNames().isEmpty()) {
-      throw new IllegalArgumentException("Point in time query without "
-        + "metric names not supported ");
-    }
-
-    String stmtStr;
-    if (condition.getStatement() != null) {
-      stmtStr = condition.getStatement();
-    } else {
-      stmtStr = String.format(GET_LATEST_METRIC_SQL,
-        getLatestMetricsHints(),
-        METRICS_RECORD_TABLE_NAME,
-        METRICS_RECORD_TABLE_NAME,
-        condition.getConditionClause());
-    }
-
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("SQL: " + stmtStr + ", condition: " + condition);
-    }
-    PreparedStatement stmt = null;
-    try {
-      stmt = connection.prepareStatement(stmtStr);
-      setQueryParameters(stmt, condition);
-    } catch (SQLException e) {
-      if (stmt != null) {
-        stmt.close();
-      }
-      throw e;
-    }
-
-    return stmt;
-  }
-
-  private static PreparedStatement setQueryParameters(PreparedStatement stmt,
-                                                      Condition condition) 
throws SQLException {
-    int pos = 1;
-    //For GET_LATEST_METRIC_SQL_SINGLE_HOST parameters should be set 2 times
-    do {
-      if (condition.getUuids() != null) {
-        for (byte[] uuid : condition.getUuids()) {
-          stmt.setBytes(pos++, uuid);
-        }
-      }
-      if (condition.getFetchSize() != null) {
-        stmt.setFetchSize(condition.getFetchSize());
-        pos++;
-      }
-    } while (pos < stmt.getParameterMetaData().getParameterCount());
-
-    return stmt;
-  }
-
-  public static PreparedStatement prepareGetAggregateSqlStmt(
-    Connection connection, Condition condition) throws SQLException {
-
-    validateConditionIsNotEmpty(condition);
-    validateRowCountLimit(condition);
-
-    String metricsAggregateTable;
-    String queryStmt;
-    if (condition.getPrecision() == null) {
-      long endTime = condition.getEndTime() == null ? 
System.currentTimeMillis() : condition.getEndTime();
-      long startTime = condition.getStartTime() == null ? 0 : 
condition.getStartTime();
-      condition.setPrecision(Precision.getPrecision(startTime, endTime));
-    }
-    switch (condition.getPrecision()) {
-      case DAYS:
-        metricsAggregateTable = METRICS_CLUSTER_AGGREGATE_DAILY_TABLE_NAME;
-        queryStmt = GET_CLUSTER_AGGREGATE_TIME_SQL;
-        break;
-      case HOURS:
-        metricsAggregateTable = METRICS_CLUSTER_AGGREGATE_HOURLY_TABLE_NAME;
-        queryStmt = GET_CLUSTER_AGGREGATE_TIME_SQL;
-        break;
-      case MINUTES:
-        metricsAggregateTable = METRICS_CLUSTER_AGGREGATE_MINUTE_TABLE_NAME;
-        queryStmt = GET_CLUSTER_AGGREGATE_TIME_SQL;
-        break;
-      default:
-        metricsAggregateTable = METRICS_CLUSTER_AGGREGATE_TABLE_NAME;
-        queryStmt = GET_CLUSTER_AGGREGATE_SQL;
-    }
-
-    queryStmt = String.format(queryStmt, metricsAggregateTable);
-
-    StringBuilder sb = new StringBuilder(queryStmt);
-    sb.append(" WHERE ");
-    sb.append(condition.getConditionClause());
-    sb.append(" ORDER BY UUID, SERVER_TIME");
-    if (condition.getLimit() != null) {
-      sb.append(" LIMIT ").append(condition.getLimit());
-    }
-
-    String query = sb.toString();
-
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("SQL => " + query + ", condition => " + condition);
-    }
-    PreparedStatement stmt = null;
-    try {
-      stmt = connection.prepareStatement(query);
-      int pos = 1;
-
-      pos = addUuids(condition, pos, stmt);
-
-      if (condition instanceof TopNCondition) {
-        pos = addStartTime(condition, pos, stmt);
-        pos = addEndTime(condition, pos, stmt);
-      }
-
-      // TODO: Upper case all strings on POST
-      pos = addStartTime(condition, pos, stmt);
-      addEndTime(condition, pos, stmt);
-    } catch (SQLException e) {
-      if (stmt != null) {
-        stmt.close();
-      }
-      throw e;
-    }
-
-    if (condition instanceof TopNCondition) {
-      LOG.info(sb.toString());
-    }
-    return stmt;
-  }
-
-  public static PreparedStatement prepareGetLatestAggregateMetricSqlStmt(
-    Connection connection, SplitByMetricNamesCondition condition) throws 
SQLException {
-
-    validateConditionIsNotEmpty(condition);
-
-    String stmtStr;
-    if (condition.getStatement() != null) {
-      stmtStr = condition.getStatement();
-    } else {
-      stmtStr = String.format(GET_CLUSTER_AGGREGATE_SQL,
-        METRICS_CLUSTER_AGGREGATE_TABLE_NAME);
-    }
-
-    StringBuilder sb = new StringBuilder(stmtStr);
-    sb.append(" WHERE ");
-    sb.append(condition.getConditionClause());
-    String orderByClause = condition.getOrderByClause(false);
-    if (orderByClause != null) {
-      sb.append(orderByClause);
-    } else {
-      sb.append(" ORDER BY UUID DESC, SERVER_TIME DESC  ");
-    }
-
-    sb.append(" LIMIT ").append(condition.getMetricNames().size());
-
-    String query = sb.toString();
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("SQL: " + query + ", condition: " + condition);
-    }
-
-    PreparedStatement stmt = null;
-    try {
-      stmt = connection.prepareStatement(query);
-      int pos = 1;
-      if (condition.getMetricNames() != null) {
-        for (; pos <= condition.getMetricNames().size(); pos++) {
-          stmt.setBytes(pos, condition.getCurrentUuid());
-        }
-      }
-    } catch (SQLException e) {
-      if (stmt != null) {
-
-      }
-      throw e;
-    }
-
-    return stmt;
-  }
-
-  public static String getTargetTableUsingPrecision(Precision precision, 
boolean withHosts) {
-
-    String inputTable = null;
-    if (precision != null) {
-      if (withHosts) {
-        switch (precision) {
-          case DAYS:
-            inputTable = PhoenixTransactSQL.METRICS_AGGREGATE_DAILY_TABLE_NAME;
-            break;
-          case HOURS:
-            inputTable = 
PhoenixTransactSQL.METRICS_AGGREGATE_HOURLY_TABLE_NAME;
-            break;
-          case MINUTES:
-            inputTable = 
PhoenixTransactSQL.METRICS_AGGREGATE_MINUTE_TABLE_NAME;
-            break;
-          default:
-            inputTable = PhoenixTransactSQL.METRICS_RECORD_TABLE_NAME;
-        }
-      } else {
-        switch (precision) {
-          case DAYS:
-            inputTable = 
PhoenixTransactSQL.METRICS_CLUSTER_AGGREGATE_DAILY_TABLE_NAME;
-            break;
-          case HOURS:
-            inputTable = 
PhoenixTransactSQL.METRICS_CLUSTER_AGGREGATE_HOURLY_TABLE_NAME;
-            break;
-          case MINUTES:
-            inputTable = 
PhoenixTransactSQL.METRICS_CLUSTER_AGGREGATE_MINUTE_TABLE_NAME;
-            break;
-          default:
-            inputTable = 
PhoenixTransactSQL.METRICS_CLUSTER_AGGREGATE_TABLE_NAME;
-        }
-      }
-    } else {
-      if (withHosts) {
-        inputTable = PhoenixTransactSQL.METRICS_RECORD_TABLE_NAME;
-      } else {
-        inputTable = PhoenixTransactSQL.METRICS_CLUSTER_AGGREGATE_TABLE_NAME;
-      }
-    }
-    return inputTable;
-  }
-
-  private static int addUuids(Condition condition, int pos, PreparedStatement 
stmt) throws SQLException {
-    if (condition.getUuids() != null) {
-      for (int pos2 = 1 ; pos2 <= condition.getUuids().size(); pos2++,pos++) {
-        byte[] uuid = condition.getUuids().get(pos2 - 1);
-        if (LOG.isDebugEnabled()) {
-          LOG.debug("Setting pos: " + pos + ", value = " + new String(uuid));
-        }
-
-        if (uuid.length != TimelineMetricMetadataManager.HOSTNAME_UUID_LENGTH 
+ TimelineMetricMetadataManager.TIMELINE_METRIC_UUID_LENGTH) {
-          stmt.setString(pos, new String(uuid));
-        } else {
-          stmt.setBytes(pos, uuid);
-        }
-      }
-    }
-    return pos;
-  }
-
-  private static int addStartTime(Condition condition, int pos, 
PreparedStatement stmt) throws SQLException {
-    if (condition.getStartTime() != null) {
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("Setting pos: " + pos + ", value: " + 
condition.getStartTime());
-      }
-      stmt.setLong(pos++, condition.getStartTime());
-    }
-    return pos;
-  }
-
-  private static int addEndTime(Condition condition, int pos, 
PreparedStatement stmt) throws SQLException {
-
-    if (condition.getEndTime() != null) {
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("Setting pos: " + pos + ", value: " + 
condition.getEndTime());
-      }
-      stmt.setLong(pos++, condition.getEndTime());
-    }
-    return pos;
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/ambari/blob/42112e28/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/query/SplitByMetricNamesCondition.java
----------------------------------------------------------------------
diff --git 
a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/query/SplitByMetricNamesCondition.java
 
b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/query/SplitByMetricNamesCondition.java
deleted file mode 100644
index 6eadcea..0000000
--- 
a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/query/SplitByMetricNamesCondition.java
+++ /dev/null
@@ -1,189 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query;
-
-import java.util.Collections;
-import java.util.List;
-
-import org.apache.hadoop.metrics2.sink.timeline.Precision;
-// TODO get rid of this class
-public class SplitByMetricNamesCondition implements Condition {
-  private final Condition adaptee;
-  private byte[] currentUuid;
-  private boolean metricNamesNotCondition = false;
-
-  public SplitByMetricNamesCondition(Condition condition){
-    this.adaptee = condition;
-  }
-
-  @Override
-  public boolean isEmpty() {
-    return adaptee.isEmpty();
-  }
-
-  @Override
-  public List<byte[]> getUuids() {
-    return adaptee.getUuids();
-  }
-
-  @Override
-  public List<String> getMetricNames() {
-    return Collections.singletonList(new String(currentUuid));
-  }
-
-  @Override
-  public boolean isPointInTime() {
-    return adaptee.isPointInTime();
-  }
-
-  @Override
-  public boolean isGrouped() {
-    return adaptee.isGrouped();
-  }
-
-  @Override
-  public void setStatement(String statement) {
-    adaptee.setStatement(statement);
-  }
-
-  @Override
-  public List<String> getHostnames() {
-    return adaptee.getHostnames();
-  }
-
-  @Override
-  public Precision getPrecision() {
-    return adaptee.getPrecision();
-  }
-
-  @Override
-  public void setPrecision(Precision precision) {
-    adaptee.setPrecision(precision);
-  }
-
-  @Override
-  public String getAppId() {
-    return adaptee.getAppId();
-  }
-
-  @Override
-  public String getInstanceId() {
-    return adaptee.getInstanceId();
-  }
-
-  @Override
-  public StringBuilder getConditionClause() {
-    StringBuilder sb = new StringBuilder();
-    boolean appendConjunction = false;
-
-    if (getMetricNames() != null) {
-      for (String name : getMetricNames()) {
-        if (sb.length() > 1) {
-          sb.append(" OR ");
-        }
-        sb.append("UUID = ?");
-      }
-
-      appendConjunction = true;
-    }
-
-    appendConjunction = DefaultCondition.append(sb, appendConjunction,
-      getStartTime(), " SERVER_TIME >= ?");
-    DefaultCondition.append(sb, appendConjunction, getEndTime(),
-      " SERVER_TIME < ?");
-
-    return sb;
-  }
-
-  @Override
-  public String getOrderByClause(boolean asc) {
-    return adaptee.getOrderByClause(asc);
-  }
-
-  @Override
-  public String getStatement() {
-    return adaptee.getStatement();
-  }
-
-  @Override
-  public Long getStartTime() {
-    return adaptee.getStartTime();
-  }
-
-  @Override
-  public Long getEndTime() {
-    return adaptee.getEndTime();
-  }
-
-  @Override
-  public Integer getLimit() {
-    return adaptee.getLimit();
-  }
-
-  @Override
-  public Integer getFetchSize() {
-    return adaptee.getFetchSize();
-  }
-
-  @Override
-  public void setFetchSize(Integer fetchSize) {
-    adaptee.setFetchSize(fetchSize);
-  }
-
-  @Override
-  public void addOrderByColumn(String column) {
-    adaptee.addOrderByColumn(column);
-  }
-
-  @Override
-  public void setNoLimit() {
-    adaptee.setNoLimit();
-  }
-
-  @Override
-  public boolean doUpdate() {
-    return false;
-  }
-
-  public List<String> getOriginalMetricNames() {
-    return adaptee.getMetricNames();
-  }
-
-  public void setCurrentUuid(byte[] uuid) {
-    this.currentUuid = uuid;
-  }
-
-  public byte[] getCurrentUuid() {
-    return currentUuid;
-  }
-
- @Override
-  public void setMetricNamesNotCondition(boolean metricNamesNotCondition) {
-    this.metricNamesNotCondition = metricNamesNotCondition;
-  }
-
-  @Override
-  public void setHostnamesNotCondition(boolean hostNamesNotCondition) {
-
-  }
-
-  @Override
-  public void setUuidNotCondition(boolean uuidNotCondition) {
-
-  }
-}

http://git-wip-us.apache.org/repos/asf/ambari/blob/42112e28/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/query/TopNCondition.java
----------------------------------------------------------------------
diff --git 
a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/query/TopNCondition.java
 
b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/query/TopNCondition.java
deleted file mode 100644
index 4a5491f..0000000
--- 
a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/query/TopNCondition.java
+++ /dev/null
@@ -1,162 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query;
-
-import java.util.List;
-
-import org.apache.commons.collections.CollectionUtils;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.metrics2.sink.timeline.Precision;
-import 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.Function;
-
-public class TopNCondition extends DefaultCondition{
-
-  private Integer topN;
-  private boolean isBottomN;
-  private Function topNFunction;
-  private static final Log LOG = LogFactory.getLog(TopNCondition.class);
-
-  public TopNCondition(List<byte[]> uuids, List<String> metricNames, 
List<String> hostnames, String appId,
-                          String instanceId, Long startTime, Long endTime, 
Precision precision,
-                          Integer limit, boolean grouped, Integer topN, 
Function topNFunction,
-                          boolean isBottomN) {
-    super(uuids, metricNames, hostnames, appId, instanceId, startTime, 
endTime, precision, limit, grouped);
-    this.topN = topN;
-    this.isBottomN = isBottomN;
-    this.topNFunction = topNFunction;
-  }
-
-  @Override
-  public StringBuilder getConditionClause() {
-
-
-    if (!(isTopNHostCondition(metricNames, hostnames) || 
isTopNMetricCondition(metricNames, hostnames))) {
-      LOG.error("Unsupported TopN Operation requested. Query can have either 
multiple hosts or multiple metric names " +
-        "but not both.");
-      return null;
-    }
-
-    StringBuilder sb = new StringBuilder();
-    sb.append(" UUID IN (");
-    sb.append(getTopNInnerQuery());
-    sb.append(")");
-
-    boolean appendConjunction = true;
-    appendConjunction = append(sb, appendConjunction, getStartTime(), " 
SERVER_TIME >= ?");
-    append(sb, appendConjunction, getEndTime(), " SERVER_TIME < ?");
-
-    return sb;
-  }
-
-  public String getTopNInnerQuery() {
-    return String.format(PhoenixTransactSQL.TOP_N_INNER_SQL,
-      PhoenixTransactSQL.getTargetTableUsingPrecision(precision, 
CollectionUtils.isNotEmpty(hostnames)),
-      super.getConditionClause().toString(), getTopNOrderByClause(), topN);
-  }
-
-  private String getTopNOrderByClause() {
-
-    String orderByClause = getColumnSelect(this.topNFunction);
-    orderByClause += (isBottomN ? " ASC" : " DESC");
-    return  orderByClause;
-  }
-
-  public static String getColumnSelect(Function topNFunction) {
-    String columnSelect = null;
-    if (topNFunction != null) {
-      switch (topNFunction.getReadFunction()) {
-        case AVG:
-          columnSelect = "ROUND(AVG(METRIC_SUM),2)";
-          break;
-        case SUM:
-          columnSelect = "SUM(METRIC_SUM)";
-          break;
-        default:
-          columnSelect = "MAX(METRIC_MAX)";
-          break;
-      }
-    }
-    if (columnSelect == null) {
-      columnSelect = "MAX(METRIC_MAX)";
-    }
-    return  columnSelect;
-  }
-
-  public boolean isTopNHostCondition() {
-    return isTopNHostCondition(metricNames, hostnames);
-  }
-
-  public boolean isTopNMetricCondition() {
-    return isTopNMetricCondition(metricNames, hostnames);
-  }
-
-  /**
-   * Check if this is a case of Top N hosts condition
-   * @param metricNames A list of Strings.
-   * @param hostnames A list of Strings.
-   * @return True if it is a Case of Top N Hosts (1 Metric and H hosts).
-   */
-  public static boolean isTopNHostCondition(List<String> metricNames, 
List<String> hostnames) {
-    // Case 1 : 1 Metric, H hosts
-    // Select Top N or Bottom N host series based on 1 metric (max/avg/sum)
-    // Hostnames cannot be empty
-    // Only 1 metric allowed, without wildcards
-    return (CollectionUtils.isNotEmpty(hostnames) && metricNames.size() == 1 
&& !metricNamesHaveWildcard(metricNames));
-
-  }
-
-  /**
-   * Check if this is a case of Top N metrics condition
-   * @param metricNames A list of Strings.
-   * @param hostnames A list of Strings.
-   * @return True if it is a Case of Top N Metrics (M Metric and 1 or 0 host).
-   */
-  public static boolean isTopNMetricCondition(List<String> metricNames, 
List<String> hostnames) {
-    // Case 2 : M Metric names or Regex, 1 or No host
-    // Select Top N or Bottom N metric series based on metric 
values(max/avg/sum)
-    // MetricNames cannot be empty
-    // No host (aggregate) or 1 host allowed, without wildcards
-    return (CollectionUtils.isNotEmpty(metricNames) && (hostnames == null || 
hostnames.size() <= 1) &&
-      !hostNamesHaveWildcard(hostnames));
-  }
-
-  public Integer getTopN() {
-    return topN;
-  }
-
-  public void setTopN(Integer topN) {
-    this.topN = topN;
-  }
-
-  public boolean isBottomN() {
-    return isBottomN;
-  }
-
-  public void setIsBottomN(boolean isBottomN) {
-    this.isBottomN = isBottomN;
-  }
-
-  public Function getTopNFunction() {
-    return topNFunction;
-  }
-
-  public void setTopNFunction(Function topNFunction) {
-    this.topNFunction = topNFunction;
-  }
-}

http://git-wip-us.apache.org/repos/asf/ambari/blob/42112e28/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/sink/DefaultFSSinkProvider.java
----------------------------------------------------------------------
diff --git 
a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/sink/DefaultFSSinkProvider.java
 
b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/sink/DefaultFSSinkProvider.java
deleted file mode 100644
index 6ec6cf9..0000000
--- 
a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/sink/DefaultFSSinkProvider.java
+++ /dev/null
@@ -1,153 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.sink;
-
-import static 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.TIMELINE_METRICS_CACHE_COMMIT_INTERVAL;
-
-import java.io.File;
-import java.io.IOException;
-import java.util.Collection;
-import java.util.Date;
-
-import org.apache.commons.io.FileUtils;
-import org.apache.commons.lang.StringUtils;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
-import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics;
-import 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration;
-import 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.source.InternalSourceProvider;
-
-public class DefaultFSSinkProvider implements ExternalSinkProvider {
-  private static final Log LOG = 
LogFactory.getLog(DefaultFSSinkProvider.class);
-  TimelineMetricConfiguration conf = TimelineMetricConfiguration.getInstance();
-  private final DefaultExternalMetricsSink sink = new 
DefaultExternalMetricsSink();
-  private long FIXED_FILE_SIZE;
-  private final String SINK_FILE_NAME = "external-metrics-sink.dat";
-  private final String SEPARATOR = ", ";
-  private final String LINE_SEP = System.lineSeparator();
-  private final String HEADERS = "METRIC, APP_ID, INSTANCE_ID, HOSTNAME, 
START_TIME, DATA";
-
-  public DefaultFSSinkProvider() {
-    try {
-      FIXED_FILE_SIZE = 
conf.getMetricsConf().getLong("timeline.metrics.service.external.fs.sink.filesize",
 FileUtils.ONE_MB * 100);
-    } catch (Exception ignored) {
-      FIXED_FILE_SIZE = FileUtils.ONE_MB * 100;
-    }
-  }
-
-  @Override
-  public ExternalMetricsSink 
getExternalMetricsSink(InternalSourceProvider.SOURCE_NAME sourceName) {
-    return sink;
-  }
-
-  class DefaultExternalMetricsSink implements ExternalMetricsSink {
-
-    @Override
-    public int getSinkTimeOutSeconds() {
-      return 10;
-    }
-
-    @Override
-    public int getFlushSeconds() {
-      try {
-        return 
conf.getMetricsConf().getInt(TIMELINE_METRICS_CACHE_COMMIT_INTERVAL, 3);
-      } catch (Exception e) {
-        LOG.warn("Cannot read cache commit interval.");
-      }
-      return 3;
-    }
-
-    private boolean createFile(File f) {
-      boolean created = false;
-      if (!f.exists()) {
-        try {
-          created = f.createNewFile();
-          FileUtils.writeStringToFile(f, HEADERS);
-        } catch (IOException e) {
-          LOG.error("Cannot create " + SINK_FILE_NAME + " at " + f.getPath());
-          return false;
-        }
-      }
-
-      return created;
-    }
-
-    private boolean shouldReCreate(File f) {
-      if (!f.exists()) {
-        return true;
-      }
-      if (FileUtils.sizeOf(f) > FIXED_FILE_SIZE) {
-        return true;
-      }
-      return false;
-    }
-
-    @Override
-    public void sinkMetricData(Collection<TimelineMetrics> metrics) {
-      String dirPath = 
TimelineMetricConfiguration.getInstance().getDefaultMetricsSinkDir();
-      File dir = new File(dirPath);
-      if (!dir.exists()) {
-        LOG.error("Cannot sink data to file system, incorrect dir path " + 
dirPath);
-        return;
-      }
-
-      File f = FileUtils.getFile(dirPath, SINK_FILE_NAME);
-      if (shouldReCreate(f)) {
-        if (!f.delete()) {
-          LOG.warn("Unable to delete external sink file.");
-          return;
-        }
-        createFile(f);
-      }
-
-      if (metrics != null) {
-        for (TimelineMetrics timelineMetrics : metrics) {
-          for (TimelineMetric metric : timelineMetrics.getMetrics()) {
-            StringBuilder sb = new StringBuilder();
-            sb.append(metric.getMetricName());
-            sb.append(SEPARATOR);
-            sb.append(metric.getAppId());
-            sb.append(SEPARATOR);
-            if (StringUtils.isEmpty(metric.getInstanceId())) {
-              sb.append(SEPARATOR);
-            } else {
-              sb.append(metric.getInstanceId());
-              sb.append(SEPARATOR);
-            }
-            if (StringUtils.isEmpty(metric.getHostName())) {
-              sb.append(SEPARATOR);
-            } else {
-              sb.append(metric.getHostName());
-              sb.append(SEPARATOR);
-            }
-            sb.append(new Date(metric.getStartTime()));
-            sb.append(SEPARATOR);
-            sb.append(metric.getMetricValues().toString());
-            sb.append(LINE_SEP);
-            try {
-              FileUtils.writeStringToFile(f, sb.toString());
-            } catch (IOException e) {
-              LOG.warn("Unable to sink data to file " + f.getPath());
-            }
-          }
-        }
-      }
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/ambari/blob/42112e28/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/sink/ExternalMetricsSink.java
----------------------------------------------------------------------
diff --git 
a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/sink/ExternalMetricsSink.java
 
b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/sink/ExternalMetricsSink.java
deleted file mode 100644
index ff06307..0000000
--- 
a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/sink/ExternalMetricsSink.java
+++ /dev/null
@@ -1,48 +0,0 @@
-package 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.sink;
-
-import java.util.Collection;
-
-import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics;
-
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-public interface ExternalMetricsSink {
-  /**
-   * How many seconds to wait on sink before dropping metrics.
-   * Note: Care should be taken that this timeout does not bottleneck the
-   * sink thread.
-   */
-  int getSinkTimeOutSeconds();
-
-  /**
-   * How frequently to flush data to external system.
-   * Default would be between 60 - 120 seconds, coherent with default sink
-   * interval of AMS.
-   */
-  int getFlushSeconds();
-
-  /**
-   * Raw data stream to process / store on external system.
-   * The data will be held in an in-memory cache and flushed at flush seconds
-   * or when the cache size limit is exceeded we will flush the cache and
-   * drop data if write fails.
-   *
-   * @param metrics {@link Collection<TimelineMetrics>}
-   */
-  void sinkMetricData(Collection<TimelineMetrics> metrics);
-}

http://git-wip-us.apache.org/repos/asf/ambari/blob/42112e28/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/sink/ExternalSinkProvider.java
----------------------------------------------------------------------
diff --git 
a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/sink/ExternalSinkProvider.java
 
b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/sink/ExternalSinkProvider.java
deleted file mode 100644
index 7c7683b..0000000
--- 
a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/sink/ExternalSinkProvider.java
+++ /dev/null
@@ -1,35 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.sink;
-
-import 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.source.InternalSourceProvider.SOURCE_NAME;
-
-
-/**
- * Configurable provider for sink classes that match the metrics sources.
- * Provider can return same sink or different sinks for each source.
- */
-public interface ExternalSinkProvider {
-
-  /**
-   * Return an instance of the metrics sink for the give source
-   * @return {@link ExternalMetricsSink}
-   */
-  ExternalMetricsSink getExternalMetricsSink(SOURCE_NAME sourceName);
-}

http://git-wip-us.apache.org/repos/asf/ambari/blob/42112e28/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/sink/HttpSinkProvider.java
----------------------------------------------------------------------
diff --git 
a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/sink/HttpSinkProvider.java
 
b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/sink/HttpSinkProvider.java
deleted file mode 100644
index 9c2a93e..0000000
--- 
a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/sink/HttpSinkProvider.java
+++ /dev/null
@@ -1,231 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.sink;
-
-import static 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.TIMELINE_METRICS_CACHE_COMMIT_INTERVAL;
-
-import java.io.BufferedReader;
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.InputStreamReader;
-import java.io.OutputStream;
-import java.net.HttpURLConnection;
-import java.net.URL;
-import java.security.KeyStore;
-import java.util.Collection;
-
-import javax.net.ssl.HttpsURLConnection;
-import javax.net.ssl.SSLContext;
-import javax.net.ssl.SSLSocketFactory;
-import javax.net.ssl.TrustManagerFactory;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics;
-import 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration;
-import 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.source.InternalSourceProvider;
-import org.apache.http.client.utils.URIBuilder;
-import org.codehaus.jackson.map.AnnotationIntrospector;
-import org.codehaus.jackson.map.ObjectMapper;
-import org.codehaus.jackson.map.annotate.JsonSerialize;
-import org.codehaus.jackson.xc.JaxbAnnotationIntrospector;
-
-public class HttpSinkProvider implements ExternalSinkProvider {
-  private static final Log LOG = LogFactory.getLog(HttpSinkProvider.class);
-  TimelineMetricConfiguration conf = TimelineMetricConfiguration.getInstance();
-
-  private String connectUrl;
-  private SSLSocketFactory sslSocketFactory;
-  protected static ObjectMapper mapper;
-
-  static {
-    mapper = new ObjectMapper();
-    AnnotationIntrospector introspector = new JaxbAnnotationIntrospector();
-    mapper.setAnnotationIntrospector(introspector);
-    mapper.getSerializationConfig()
-      .withSerializationInclusion(JsonSerialize.Inclusion.NON_NULL);
-  }
-
-  public HttpSinkProvider() {
-    Configuration config;
-    try {
-      config = conf.getMetricsConf();
-    } catch (Exception e) {
-      throw new ExceptionInInitializerError("Unable to read configuration for 
sink.");
-    }
-    String protocol = 
config.get("timeline.metrics.service.external.http.sink.protocol", "http");
-    String host = 
config.get("timeline.metrics.service.external.http.sink.host", "localhost");
-    String port = 
config.get("timeline.metrics.service.external.http.sink.port", "6189");
-
-    if (protocol.contains("https")) {
-      loadTruststore(
-        
config.getTrimmed("timeline.metrics.service.external.http.sink.truststore.path"),
-        
config.getTrimmed("timeline.metrics.service.external.http.sink.truststore.type"),
-        
config.getTrimmed("timeline.metrics.service.external.http.sink.truststore.password")
-      );
-    }
-
-    URIBuilder uriBuilder = new URIBuilder();
-    uriBuilder.setScheme(protocol);
-    uriBuilder.setHost(host);
-    uriBuilder.setPort(Integer.parseInt(port));
-    connectUrl = uriBuilder.toString();
-  }
-
-  @Override
-  public ExternalMetricsSink 
getExternalMetricsSink(InternalSourceProvider.SOURCE_NAME sourceName) {
-    return new DefaultHttpMetricsSink();
-  }
-
-  protected HttpURLConnection getConnection(String spec) throws IOException {
-    return (HttpURLConnection) new URL(spec).openConnection();
-  }
-
-  // Get an ssl connection
-  protected HttpsURLConnection getSSLConnection(String spec)
-    throws IOException, IllegalStateException {
-
-    HttpsURLConnection connection = (HttpsURLConnection) (new 
URL(spec).openConnection());
-    connection.setSSLSocketFactory(sslSocketFactory);
-    return connection;
-  }
-
-  protected void loadTruststore(String trustStorePath, String trustStoreType,
-                                String trustStorePassword) {
-    if (sslSocketFactory == null) {
-      if (trustStorePath == null || trustStorePassword == null) {
-        String msg = "Can't load TrustStore. Truststore path or password is 
not set.";
-        LOG.error(msg);
-        throw new IllegalStateException(msg);
-      }
-      FileInputStream in = null;
-      try {
-        in = new FileInputStream(new File(trustStorePath));
-        KeyStore store = KeyStore.getInstance(trustStoreType == null ?
-          KeyStore.getDefaultType() : trustStoreType);
-        store.load(in, trustStorePassword.toCharArray());
-        TrustManagerFactory tmf = TrustManagerFactory
-          .getInstance(TrustManagerFactory.getDefaultAlgorithm());
-        tmf.init(store);
-        SSLContext context = SSLContext.getInstance("TLS");
-        context.init(null, tmf.getTrustManagers(), null);
-        sslSocketFactory = context.getSocketFactory();
-      } catch (Exception e) {
-        LOG.error("Unable to load TrustStore", e);
-      } finally {
-        if (in != null) {
-          try {
-            in.close();
-          } catch (IOException e) {
-            LOG.error("Unable to load TrustStore", e);
-          }
-        }
-      }
-    }
-  }
-
-  class DefaultHttpMetricsSink implements ExternalMetricsSink {
-
-    @Override
-    public int getSinkTimeOutSeconds() {
-      try {
-        return 
conf.getMetricsConf().getInt("timeline.metrics.external.sink.http.timeout.seconds",
 10);
-      } catch (Exception e) {
-        return 10;
-      }
-    }
-
-    @Override
-    public int getFlushSeconds() {
-      try {
-        return 
conf.getMetricsConf().getInt(TIMELINE_METRICS_CACHE_COMMIT_INTERVAL, 3);
-      } catch (Exception e) {
-        LOG.warn("Cannot read cache commit interval.");
-      }
-      return 3;
-    }
-
-    /**
-     * Cleans up and closes an input stream
-     * see 
http://docs.oracle.com/javase/6/docs/technotes/guides/net/http-keepalive.html
-     * @param is the InputStream to clean up
-     * @return string read from the InputStream
-     * @throws IOException
-     */
-    protected String cleanupInputStream(InputStream is) throws IOException {
-      StringBuilder sb = new StringBuilder();
-      if (is != null) {
-        try (
-          InputStreamReader isr = new InputStreamReader(is);
-          BufferedReader br = new BufferedReader(isr)
-        ) {
-          // read the response body
-          String line;
-          while ((line = br.readLine()) != null) {
-            if (LOG.isDebugEnabled()) {
-              sb.append(line);
-            }
-          }
-        } finally {
-          is.close();
-        }
-      }
-      return sb.toString();
-    }
-
-    @Override
-    public void sinkMetricData(Collection<TimelineMetrics> metrics) {
-      HttpURLConnection connection = null;
-      try {
-        connection = connectUrl.startsWith("https") ? 
getSSLConnection(connectUrl) : getConnection(connectUrl);
-
-        connection.setRequestMethod("POST");
-        connection.setRequestProperty("Content-Type", "application/json");
-        connection.setRequestProperty("Connection", "Keep-Alive");
-        connection.setConnectTimeout(getSinkTimeOutSeconds());
-        connection.setReadTimeout(getSinkTimeOutSeconds());
-        connection.setDoOutput(true);
-
-        if (metrics != null) {
-          String jsonData = mapper.writeValueAsString(metrics);
-          try (OutputStream os = connection.getOutputStream()) {
-            os.write(jsonData.getBytes("UTF-8"));
-          }
-        }
-
-        int statusCode = connection.getResponseCode();
-
-        if (statusCode != 200) {
-          LOG.info("Unable to POST metrics to external sink, " + connectUrl +
-            ", statusCode = " + statusCode);
-        } else {
-          if (LOG.isDebugEnabled()) {
-            LOG.debug("Metrics posted to external sink " + connectUrl);
-          }
-        }
-        cleanupInputStream(connection.getInputStream());
-
-      } catch (IOException io) {
-        LOG.warn("Unable to sink data to external system.", io);
-      }
-    }
-  }
-}

Reply via email to