http://git-wip-us.apache.org/repos/asf/ambari/blob/42112e28/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricClusterAggregator.java
----------------------------------------------------------------------
diff --git 
a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricClusterAggregator.java
 
b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricClusterAggregator.java
deleted file mode 100644
index 5c370f4..0000000
--- 
a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricClusterAggregator.java
+++ /dev/null
@@ -1,145 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators;
-
-import static 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.GET_CLUSTER_AGGREGATE_SQL;
-import static 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.GET_CLUSTER_AGGREGATE_TIME_SQL;
-import static 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.METRICS_CLUSTER_AGGREGATE_TABLE_NAME;
-
-import java.io.IOException;
-import java.sql.ResultSet;
-import java.sql.SQLException;
-import java.util.HashMap;
-import java.util.Map;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.metrics2.sink.timeline.MetricClusterAggregate;
-import org.apache.hadoop.metrics2.sink.timeline.MetricHostAggregate;
-import 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixHBaseAccessor;
-import 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.AggregationTaskRunner.AGGREGATOR_NAME;
-import 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.MetricCollectorHAController;
-import 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.discovery.TimelineMetricMetadataManager;
-import 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.Condition;
-import 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.DefaultCondition;
-
-public class TimelineMetricClusterAggregator extends 
AbstractTimelineAggregator {
-  private final TimelineMetricReadHelper readHelper;
-  private final boolean isClusterPrecisionInputTable;
-
-  public TimelineMetricClusterAggregator(AGGREGATOR_NAME aggregatorName,
-                                         TimelineMetricMetadataManager 
metricMetadataManager,
-                                         PhoenixHBaseAccessor hBaseAccessor,
-                                         Configuration metricsConf,
-                                         String checkpointLocation,
-                                         Long sleepIntervalMillis,
-                                         Integer checkpointCutOffMultiplier,
-                                         String hostAggregatorDisabledParam,
-                                         String inputTableName,
-                                         String outputTableName,
-                                         Long nativeTimeRangeDelay,
-                                         MetricCollectorHAController 
haController) {
-    super(aggregatorName, hBaseAccessor, metricsConf, checkpointLocation,
-      sleepIntervalMillis, checkpointCutOffMultiplier,
-      hostAggregatorDisabledParam, inputTableName, outputTableName,
-      nativeTimeRangeDelay, haController);
-    isClusterPrecisionInputTable = 
inputTableName.equals(METRICS_CLUSTER_AGGREGATE_TABLE_NAME);
-    readHelper = new TimelineMetricReadHelper(metricMetadataManager, true);
-  }
-
-  @Override
-  protected Condition prepareMetricQueryCondition(long startTime, long 
endTime) {
-    Condition condition = new DefaultCondition(null, null, null, null, 
startTime,
-      endTime, null, null, true);
-    condition.setNoLimit();
-    condition.setFetchSize(resultsetFetchSize);
-    String sqlStr = String.format(GET_CLUSTER_AGGREGATE_TIME_SQL, tableName);
-    // HOST_COUNT vs METRIC_COUNT
-    if (isClusterPrecisionInputTable) {
-      sqlStr = String.format(GET_CLUSTER_AGGREGATE_SQL, tableName);
-    }
-
-    condition.setStatement(sqlStr);
-    condition.addOrderByColumn("UUID");
-    condition.addOrderByColumn("SERVER_TIME");
-    return condition;
-  }
-
-  @Override
-  protected void aggregate(ResultSet rs, long startTime, long endTime) throws 
IOException, SQLException {
-    Map<TimelineClusterMetric, MetricHostAggregate> hostAggregateMap = 
aggregateMetricsFromResultSet(rs, endTime);
-
-    LOG.info("Saving " + hostAggregateMap.size() + " metric aggregates.");
-    hBaseAccessor.saveClusterAggregateRecordsSecond(hostAggregateMap, 
outputTableName);
-  }
-
-  private Map<TimelineClusterMetric, MetricHostAggregate> 
aggregateMetricsFromResultSet(ResultSet rs, long endTime)
-    throws IOException, SQLException {
-
-    TimelineClusterMetric existingMetric = null;
-    MetricHostAggregate hostAggregate = null;
-    Map<TimelineClusterMetric, MetricHostAggregate> hostAggregateMap =
-      new HashMap<TimelineClusterMetric, MetricHostAggregate>();
-    int perMetricCount = 0;
-
-    while (rs.next()) {
-      TimelineClusterMetric currentMetric = readHelper.fromResultSet(rs);
-
-      MetricClusterAggregate currentHostAggregate =
-        isClusterPrecisionInputTable ?
-          readHelper.getMetricClusterAggregateFromResultSet(rs) :
-          readHelper.getMetricClusterTimeAggregateFromResultSet(rs);
-
-      if (existingMetric == null) {
-        // First row
-        existingMetric = currentMetric;
-        currentMetric.setTimestamp(endTime);
-        hostAggregate = new MetricHostAggregate();
-        hostAggregateMap.put(currentMetric, hostAggregate);
-        perMetricCount++;
-      }
-
-      if (existingMetric.equalsExceptTime(currentMetric)) {
-        // Recalculate totals with current metric
-        updateAggregatesFromHost(hostAggregate, currentHostAggregate);
-        perMetricCount++;
-      } else {
-        // Switched over to a new metric - save new metric
-
-        hostAggregate.setSum(hostAggregate.getSum() / perMetricCount);
-        
hostAggregate.setNumberOfSamples(Math.round((float)hostAggregate.getNumberOfSamples()
 / (float)perMetricCount));
-        perMetricCount = 1;
-
-        hostAggregate = new MetricHostAggregate();
-        currentMetric.setTimestamp(endTime);
-        updateAggregatesFromHost(hostAggregate, currentHostAggregate);
-        hostAggregateMap.put(currentMetric, hostAggregate);
-        existingMetric = currentMetric;
-      }
-
-    }
-
-    return hostAggregateMap;
-  }
-
-  private void updateAggregatesFromHost(MetricHostAggregate agg, 
MetricClusterAggregate currentClusterAggregate) {
-    agg.updateMax(currentClusterAggregate.getMax());
-    agg.updateMin(currentClusterAggregate.getMin());
-    agg.updateSum(currentClusterAggregate.getSum());
-    agg.updateNumberOfSamples(currentClusterAggregate.getNumberOfHosts());
-  }
-}

http://git-wip-us.apache.org/repos/asf/ambari/blob/42112e28/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricClusterAggregatorSecond.java
----------------------------------------------------------------------
diff --git 
a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricClusterAggregatorSecond.java
 
b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricClusterAggregatorSecond.java
deleted file mode 100644
index bbe8f7b..0000000
--- 
a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricClusterAggregatorSecond.java
+++ /dev/null
@@ -1,263 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators;
-
-
-import static 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.SERVER_SIDE_TIMESIFT_ADJUSTMENT;
-import static 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.TIMELINE_METRICS_CLUSTER_AGGREGATOR_INTERPOLATION_ENABLED;
-import static 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.TIMELINE_METRIC_AGGREGATION_SQL_FILTERS;
-import static 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.AggregatorUtils.getTimeSlices;
-import static 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.AggregatorUtils.sliceFromTimelineMetric;
-import static 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.GET_METRIC_SQL;
-import static 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.METRICS_RECORD_TABLE_NAME;
-
-import java.io.IOException;
-import java.sql.ResultSet;
-import java.sql.SQLException;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.commons.lang.StringUtils;
-import org.apache.commons.lang.mutable.MutableInt;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.metrics2.sink.timeline.MetricClusterAggregate;
-import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
-import org.apache.hadoop.metrics2.sink.timeline.TimelineMetricMetadata;
-import 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixHBaseAccessor;
-import 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.AggregationTaskRunner.AGGREGATOR_NAME;
-import 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.MetricCollectorHAController;
-import 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.discovery.TimelineMetricMetadataKey;
-import 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.discovery.TimelineMetricMetadataManager;
-import 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.Condition;
-import 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.DefaultCondition;
-
-/**
- * Aggregates a metric across all hosts in the cluster. Reads metrics from
- * the precision table and saves into the aggregate.
- */
-public class TimelineMetricClusterAggregatorSecond extends 
AbstractTimelineAggregator {
-  public Long timeSliceIntervalMillis;
-  private TimelineMetricReadHelper timelineMetricReadHelper;
-  // Aggregator to perform app-level aggregates for host metrics
-  private final TimelineMetricAppAggregator appAggregator;
-  // 1 minute client side buffering adjustment
-  protected final Long serverTimeShiftAdjustment;
-  protected final boolean interpolationEnabled;
-  private TimelineMetricMetadataManager metadataManagerInstance;
-  private String skipAggrPatternStrings;
-  private final static String liveHostsMetricName = "live_hosts";
-
-  public TimelineMetricClusterAggregatorSecond(AGGREGATOR_NAME aggregatorName,
-                                               TimelineMetricMetadataManager 
metadataManager,
-                                               PhoenixHBaseAccessor 
hBaseAccessor,
-                                               Configuration metricsConf,
-                                               String checkpointLocation,
-                                               Long sleepIntervalMillis,
-                                               Integer 
checkpointCutOffMultiplier,
-                                               String aggregatorDisabledParam,
-                                               String tableName,
-                                               String outputTableName,
-                                               Long nativeTimeRangeDelay,
-                                               Long timeSliceInterval,
-                                               MetricCollectorHAController 
haController) {
-    super(aggregatorName, hBaseAccessor, metricsConf, checkpointLocation,
-      sleepIntervalMillis, checkpointCutOffMultiplier, aggregatorDisabledParam,
-      tableName, outputTableName, nativeTimeRangeDelay, haController);
-
-    this.metadataManagerInstance = metadataManager;
-    appAggregator = new TimelineMetricAppAggregator(metadataManager, 
metricsConf);
-    this.timeSliceIntervalMillis = timeSliceInterval;
-    this.serverTimeShiftAdjustment = 
Long.parseLong(metricsConf.get(SERVER_SIDE_TIMESIFT_ADJUSTMENT, "90000"));
-    this.interpolationEnabled = 
Boolean.parseBoolean(metricsConf.get(TIMELINE_METRICS_CLUSTER_AGGREGATOR_INTERPOLATION_ENABLED,
 "true"));
-    this.skipAggrPatternStrings = 
metricsConf.get(TIMELINE_METRIC_AGGREGATION_SQL_FILTERS);
-    this.timelineMetricReadHelper = new 
TimelineMetricReadHelper(metadataManager, true);
-  }
-
-  @Override
-  protected void aggregate(ResultSet rs, long startTime, long endTime) throws 
SQLException, IOException {
-    // Account for time shift due to client side buffering by shifting the
-    // timestamps with the difference between server time and series start time
-    // Also, we do not want to look at the shift time period from the end as 
well since we can interpolate those points
-    // that come earlier than the expected, during the next run.
-    List<Long[]> timeSlices = getTimeSlices(startTime - 
serverTimeShiftAdjustment, endTime - serverTimeShiftAdjustment, 
timeSliceIntervalMillis);
-    // Initialize app aggregates for host metrics
-    appAggregator.init();
-    Map<TimelineClusterMetric, MetricClusterAggregate> aggregateClusterMetrics 
=
-      aggregateMetricsFromResultSet(rs, timeSlices);
-
-    LOG.info("Saving " + aggregateClusterMetrics.size() + " metric 
aggregates.");
-    hBaseAccessor.saveClusterAggregateRecords(aggregateClusterMetrics);
-    appAggregator.cleanup();
-  }
-
-  @Override
-  protected Condition prepareMetricQueryCondition(long startTime, long 
endTime) {
-
-    List<String> metricNames = new ArrayList<>();
-    boolean metricNamesNotCondition = false;
-
-    if (!StringUtils.isEmpty(skipAggrPatternStrings)) {
-      LOG.info("Skipping aggregation for metric patterns : " + 
skipAggrPatternStrings);
-      metricNames.addAll(Arrays.asList(skipAggrPatternStrings.split(",")));
-      metricNamesNotCondition = true;
-    }
-
-    Condition condition = new DefaultCondition(metricNames, null, null, null, 
startTime - serverTimeShiftAdjustment,
-      endTime, null, null, true);
-    condition.setMetricNamesNotCondition(metricNamesNotCondition);
-    condition.setNoLimit();
-    condition.setFetchSize(resultsetFetchSize);
-    condition.setStatement(String.format(GET_METRIC_SQL,
-      METRICS_RECORD_TABLE_NAME));
-    // Retaining order of the row-key avoids client side merge sort.
-    condition.addOrderByColumn("UUID");
-    condition.addOrderByColumn("SERVER_TIME");
-    return condition;
-  }
-
-  Map<TimelineClusterMetric, MetricClusterAggregate> 
aggregateMetricsFromResultSet(ResultSet rs, List<Long[]> timeSlices)
-      throws SQLException, IOException {
-    Map<TimelineClusterMetric, MetricClusterAggregate> aggregateClusterMetrics 
=
-      new HashMap<TimelineClusterMetric, MetricClusterAggregate>();
-
-    TimelineMetric metric = null;
-    Map<String, MutableInt> hostedAppCounter = new HashMap<>();
-    if (rs.next()) {
-      metric = timelineMetricReadHelper.getTimelineMetricFromResultSet(rs);
-
-      // Call slice after all rows for a host are read
-      while (rs.next()) {
-        TimelineMetric nextMetric = 
timelineMetricReadHelper.getTimelineMetricFromResultSet(rs);
-        // If rows belong to same host combine them before slicing. This
-        // avoids issues across rows that belong to same hosts but get
-        // counted as coming from different ones.
-        if (metric.equalsExceptTime(nextMetric)) {
-          metric.addMetricValues(nextMetric.getMetricValues());
-        } else {
-          // Process the current metric
-          int numHosts = 
processAggregateClusterMetrics(aggregateClusterMetrics, metric, timeSlices);
-          if (!hostedAppCounter.containsKey(metric.getAppId())) {
-            hostedAppCounter.put(metric.getAppId(), new MutableInt(numHosts));
-          } else {
-            int currentHostCount = 
hostedAppCounter.get(metric.getAppId()).intValue();
-            if (currentHostCount < numHosts) {
-              hostedAppCounter.put(metric.getAppId(), new 
MutableInt(numHosts));
-            }
-          }
-          metric = nextMetric;
-        }
-      }
-    }
-    // Process last metric
-    if (metric != null) {
-      int numHosts = processAggregateClusterMetrics(aggregateClusterMetrics, 
metric, timeSlices);
-      if (!hostedAppCounter.containsKey(metric.getAppId())) {
-        hostedAppCounter.put(metric.getAppId(), new MutableInt(numHosts));
-      } else {
-        int currentHostCount = 
hostedAppCounter.get(metric.getAppId()).intValue();
-        if (currentHostCount < numHosts) {
-          hostedAppCounter.put(metric.getAppId(), new MutableInt(numHosts));
-        }
-      }
-    }
-
-    // Add app level aggregates to save
-    aggregateClusterMetrics.putAll(appAggregator.getAggregateClusterMetrics());
-
-    // Add liveHosts per AppId metrics.
-    long timestamp = timeSlices.get(timeSlices.size() - 1)[1];
-    processLiveAppCountMetrics(aggregateClusterMetrics, hostedAppCounter, 
timestamp);
-
-    return aggregateClusterMetrics;
-  }
-
-  /**
-   * Slice metric values into interval specified by :
-   * timeline.metrics.cluster.aggregator.minute.timeslice.interval
-   * Normalize value by averaging them within the interval
-   */
-  protected int processAggregateClusterMetrics(Map<TimelineClusterMetric, 
MetricClusterAggregate> aggregateClusterMetrics,
-                                              TimelineMetric metric, 
List<Long[]> timeSlices) {
-    // Create time slices
-    TimelineMetricMetadataKey appKey =  new 
TimelineMetricMetadataKey(metric.getMetricName(), metric.getAppId(), 
metric.getInstanceId());
-    TimelineMetricMetadata metricMetadata = 
metadataManagerInstance.getMetadataCacheValue(appKey);
-
-    if (metricMetadata != null && !metricMetadata.isSupportsAggregates()) {
-      LOG.debug("Skipping cluster aggregation for " + metric.getMetricName());
-      return 0;
-    }
-
-    Map<TimelineClusterMetric, Double> clusterMetrics = 
sliceFromTimelineMetric(metric, timeSlices, interpolationEnabled);
-
-    return aggregateClusterMetricsFromSlices(clusterMetrics, 
aggregateClusterMetrics, metric.getHostName());
-  }
-
-  protected int aggregateClusterMetricsFromSlices(Map<TimelineClusterMetric, 
Double> clusterMetrics,
-                                                  Map<TimelineClusterMetric, 
MetricClusterAggregate> aggregateClusterMetrics,
-                                                  String hostname) {
-
-    int numHosts = 0;
-    if (clusterMetrics != null && !clusterMetrics.isEmpty()) {
-      for (Map.Entry<TimelineClusterMetric, Double> clusterMetricEntry : 
clusterMetrics.entrySet()) {
-
-        TimelineClusterMetric clusterMetric = clusterMetricEntry.getKey();
-        Double avgValue = clusterMetricEntry.getValue();
-
-        MetricClusterAggregate aggregate = 
aggregateClusterMetrics.get(clusterMetric);
-
-        if (aggregate == null) {
-          aggregate = new MetricClusterAggregate(avgValue, 1, null, avgValue, 
avgValue);
-          aggregateClusterMetrics.put(clusterMetric, aggregate);
-        } else {
-          aggregate.updateSum(avgValue);
-          aggregate.updateNumberOfHosts(1);
-          aggregate.updateMax(avgValue);
-          aggregate.updateMin(avgValue);
-        }
-
-        numHosts = aggregate.getNumberOfHosts();
-        // Update app level aggregates
-        appAggregator.processTimelineClusterMetric(clusterMetric, hostname, 
avgValue);
-      }
-    }
-    return numHosts;
-  }
-
-  /* Add cluster metric for number of hosts that are hosting an appId */
-  protected void processLiveAppCountMetrics(Map<TimelineClusterMetric, 
MetricClusterAggregate> aggregateClusterMetrics,
-      Map<String, MutableInt> appHostsCount, long timestamp) {
-
-    for (Map.Entry<String, MutableInt> appHostsEntry : 
appHostsCount.entrySet()) {
-      TimelineClusterMetric timelineClusterMetric = new TimelineClusterMetric(
-        liveHostsMetricName, appHostsEntry.getKey(), null, timestamp);
-
-      Integer numOfHosts = appHostsEntry.getValue().intValue();
-
-      MetricClusterAggregate metricClusterAggregate = new 
MetricClusterAggregate(
-        (double) numOfHosts, 1, null, (double) numOfHosts, (double) 
numOfHosts);
-
-      metadataManagerInstance.getUuid(timelineClusterMetric);
-
-      aggregateClusterMetrics.put(timelineClusterMetric, 
metricClusterAggregate);
-    }
-
-  }
-}

http://git-wip-us.apache.org/repos/asf/ambari/blob/42112e28/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricClusterAggregatorSecondWithCacheSource.java
----------------------------------------------------------------------
diff --git 
a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricClusterAggregatorSecondWithCacheSource.java
 
b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricClusterAggregatorSecondWithCacheSource.java
deleted file mode 100644
index dc31086..0000000
--- 
a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricClusterAggregatorSecondWithCacheSource.java
+++ /dev/null
@@ -1,104 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators;
-
-import static 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.AggregatorUtils.getTimeSlices;
-
-import java.util.Date;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.commons.lang.mutable.MutableInt;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.metrics2.sink.timeline.MetricClusterAggregate;
-import 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixHBaseAccessor;
-import 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricDistributedCache;
-import 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.AggregationTaskRunner;
-import 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.MetricCollectorHAController;
-import 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.discovery.TimelineMetricMetadataManager;
-
-public class TimelineMetricClusterAggregatorSecondWithCacheSource extends 
TimelineMetricClusterAggregatorSecond {
-  private TimelineMetricDistributedCache distributedCache;
-  public 
TimelineMetricClusterAggregatorSecondWithCacheSource(AggregationTaskRunner.AGGREGATOR_NAME
 metricAggregateSecond, TimelineMetricMetadataManager metricMetadataManager, 
PhoenixHBaseAccessor hBaseAccessor, Configuration metricsConf, String 
checkpointLocation, long sleepIntervalMillis, int checkpointCutOffMultiplier, 
String aggregatorDisabledParam, String inputTableName, String outputTableName,
-                                                              Long 
nativeTimeRangeDelay,
-                                                              Long 
timeSliceInterval,
-                                                              
MetricCollectorHAController haController, TimelineMetricDistributedCache 
distributedCache) {
-    super(metricAggregateSecond, metricMetadataManager, hBaseAccessor, 
metricsConf, checkpointLocation, sleepIntervalMillis, 
checkpointCutOffMultiplier, aggregatorDisabledParam, inputTableName, 
outputTableName, nativeTimeRangeDelay, timeSliceInterval, haController);
-    this.distributedCache = distributedCache;
-  }
-
-  @Override
-  public boolean doWork(long startTime, long endTime) {
-    LOG.info("Start aggregation cycle @ " + new Date() + ", " +
-          "startTime = " + new Date(startTime) + ", endTime = " + new 
Date(endTime));
-    try {
-      Map<String, Double> caheMetrics;
-      if (LOG.isDebugEnabled()) {
-        caheMetrics = distributedCache.getPointInTimeCacheMetrics();
-        LOG.debug("Ignite metrics before eviction : " + caheMetrics);
-      }
-
-      LOG.info("Trying to evict elements from cache");
-      Map<TimelineClusterMetric, MetricClusterAggregate> metricsFromCache = 
distributedCache.evictMetricAggregates(startTime - serverTimeShiftAdjustment, 
endTime - serverTimeShiftAdjustment);
-      LOG.info(String.format("Evicted %s elements from cache.", 
metricsFromCache.size()));
-
-      if (LOG.isDebugEnabled()) {
-        caheMetrics = distributedCache.getPointInTimeCacheMetrics();
-        LOG.debug("Ignite metrics after eviction : " + caheMetrics);
-      }
-
-      List<Long[]> timeSlices = getTimeSlices(startTime - 
serverTimeShiftAdjustment, endTime - serverTimeShiftAdjustment, 
timeSliceIntervalMillis);
-      Map<TimelineClusterMetric, MetricClusterAggregate> result = 
aggregateMetricsFromMetricClusterAggregates(metricsFromCache, timeSlices);
-
-      LOG.info("Saving " + result.size() + " metric aggregates.");
-      hBaseAccessor.saveClusterAggregateRecords(result);
-      LOG.info("End aggregation cycle @ " + new Date());
-      return true;
-    } catch (Exception e) {
-      LOG.error("Exception during aggregation. ", e);
-      return false;
-    }
-  }
-
-  //Slices in cache could be different from aggregate slices, so need to 
recalculate. Counts hosted apps
-  Map<TimelineClusterMetric, MetricClusterAggregate> 
aggregateMetricsFromMetricClusterAggregates(Map<TimelineClusterMetric, 
MetricClusterAggregate> metricsFromCache, List<Long[]> timeSlices) {
-    //TODO add basic interpolation
-    //TODO investigate if needed, maybe add config to disable/enable
-    //count hosted apps
-    Map<String, MutableInt> hostedAppCounter = new HashMap<>();
-    for (Map.Entry<TimelineClusterMetric, MetricClusterAggregate> 
clusterMetricAggregateEntry : metricsFromCache.entrySet()) {
-      int numHosts = clusterMetricAggregateEntry.getValue().getNumberOfHosts();
-      String appId = clusterMetricAggregateEntry.getKey().getAppId();
-      if (!hostedAppCounter.containsKey(appId)) {
-        hostedAppCounter.put(appId, new MutableInt(numHosts));
-      } else {
-        int currentHostCount = hostedAppCounter.get(appId).intValue();
-        if (currentHostCount < numHosts) {
-          hostedAppCounter.put(appId, new MutableInt(numHosts));
-        }
-      }
-    }
-
-    // Add liveHosts per AppId metrics.
-    processLiveAppCountMetrics(metricsFromCache, hostedAppCounter, 
timeSlices.get(timeSlices.size() - 1)[1]);
-
-    return metricsFromCache;
-  }
-
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ambari/blob/42112e28/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricFilteringHostAggregator.java
----------------------------------------------------------------------
diff --git 
a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricFilteringHostAggregator.java
 
b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricFilteringHostAggregator.java
deleted file mode 100644
index a75d2c4..0000000
--- 
a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricFilteringHostAggregator.java
+++ /dev/null
@@ -1,94 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators;
-
-import static 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.GET_METRIC_AGGREGATE_ONLY_SQL;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixHBaseAccessor;
-import 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.AggregationTaskRunner;
-import 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.MetricCollectorHAController;
-import 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.discovery.TimelineMetricMetadataManager;
-import 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.Condition;
-import 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.DefaultCondition;
-
-public class TimelineMetricFilteringHostAggregator extends 
TimelineMetricHostAggregator {
-  private static final Log LOG = 
LogFactory.getLog(TimelineMetricFilteringHostAggregator.class);
-  private TimelineMetricMetadataManager metricMetadataManager;
-  private ConcurrentHashMap<String, Long> postedAggregatedMap;
-
-  public 
TimelineMetricFilteringHostAggregator(AggregationTaskRunner.AGGREGATOR_NAME 
aggregatorName,
-                                               TimelineMetricMetadataManager 
metricMetadataManager,
-                                               PhoenixHBaseAccessor 
hBaseAccessor,
-                                               Configuration metricsConf,
-                                               String checkpointLocation,
-                                               Long sleepIntervalMillis,
-                                               Integer 
checkpointCutOffMultiplier,
-                                               String 
hostAggregatorDisabledParam,
-                                               String tableName,
-                                               String outputTableName,
-                                               Long nativeTimeRangeDelay,
-                                               MetricCollectorHAController 
haController,
-                                               ConcurrentHashMap<String, Long> 
postedAggregatedMap) {
-    super(aggregatorName, metricMetadataManager,
-      hBaseAccessor, metricsConf,
-      checkpointLocation,
-      sleepIntervalMillis,
-      checkpointCutOffMultiplier,
-      hostAggregatorDisabledParam,
-      tableName,
-      outputTableName,
-      nativeTimeRangeDelay,
-      haController);
-    this.metricMetadataManager = metricMetadataManager;
-    this.postedAggregatedMap = postedAggregatedMap;
-  }
-
-  @Override
-  protected Condition prepareMetricQueryCondition(long startTime, long 
endTime) {
-    List<String> aggregatedHostnames = new ArrayList<>();
-    for (Map.Entry<String, Long> entry : postedAggregatedMap.entrySet()) {
-      if (entry.getValue() > startTime && entry.getValue() <= endTime) {
-        aggregatedHostnames.add(entry.getKey());
-      }
-    }
-    List<String> notAggregatedHostnames = 
metricMetadataManager.getNotLikeHostnames(aggregatedHostnames);
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("Already aggregated hostnames based on postedAggregatedMap : " 
+ aggregatedHostnames);
-      LOG.debug("Hostnames that will be aggregated : " + 
notAggregatedHostnames);
-    }
-    List<byte[]> uuids = metricMetadataManager.getUuids(new 
ArrayList<String>(), notAggregatedHostnames, "", "");
-
-    Condition condition = new DefaultCondition(uuids, null, null, null, null, 
startTime,
-      endTime, null, null, true);
-    condition.setNoLimit();
-    condition.setFetchSize(resultsetFetchSize);
-    condition.setStatement(String.format(GET_METRIC_AGGREGATE_ONLY_SQL, 
tableName));
-    // Retaining order of the row-key avoids client side merge sort.
-    condition.addOrderByColumn("UUID");
-    condition.addOrderByColumn("SERVER_TIME");
-    return condition;
-  }
-}

http://git-wip-us.apache.org/repos/asf/ambari/blob/42112e28/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricHostAggregator.java
----------------------------------------------------------------------
diff --git 
a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricHostAggregator.java
 
b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricHostAggregator.java
deleted file mode 100644
index 6a11599..0000000
--- 
a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricHostAggregator.java
+++ /dev/null
@@ -1,121 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators;
-
-import static 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.GET_METRIC_AGGREGATE_ONLY_SQL;
-
-import java.io.IOException;
-import java.sql.ResultSet;
-import java.sql.SQLException;
-import java.util.HashMap;
-import java.util.Map;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.metrics2.sink.timeline.MetricHostAggregate;
-import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
-import 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixHBaseAccessor;
-import 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.AggregationTaskRunner.AGGREGATOR_NAME;
-import 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.MetricCollectorHAController;
-import 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.discovery.TimelineMetricMetadataManager;
-import 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.Condition;
-import 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.DefaultCondition;
-
-public class TimelineMetricHostAggregator extends AbstractTimelineAggregator {
-  private static final Log LOG = 
LogFactory.getLog(TimelineMetricHostAggregator.class);
-  TimelineMetricReadHelper readHelper;
-
-  public TimelineMetricHostAggregator(AGGREGATOR_NAME aggregatorName,
-                                      TimelineMetricMetadataManager 
metricMetadataManager,
-                                      PhoenixHBaseAccessor hBaseAccessor,
-                                      Configuration metricsConf,
-                                      String checkpointLocation,
-                                      Long sleepIntervalMillis,
-                                      Integer checkpointCutOffMultiplier,
-                                      String hostAggregatorDisabledParam,
-                                      String tableName,
-                                      String outputTableName,
-                                      Long nativeTimeRangeDelay,
-                                      MetricCollectorHAController 
haController) {
-    super(aggregatorName, hBaseAccessor, metricsConf, checkpointLocation,
-      sleepIntervalMillis, checkpointCutOffMultiplier, 
hostAggregatorDisabledParam,
-      tableName, outputTableName, nativeTimeRangeDelay, haController);
-    readHelper = new TimelineMetricReadHelper(metricMetadataManager, false);
-  }
-
-  @Override
-  protected void aggregate(ResultSet rs, long startTime, long endTime) throws 
IOException, SQLException {
-
-    Map<TimelineMetric, MetricHostAggregate> hostAggregateMap = 
aggregateMetricsFromResultSet(rs, endTime);
-
-    LOG.info("Saving " + hostAggregateMap.size() + " metric aggregates.");
-    hBaseAccessor.saveHostAggregateRecords(hostAggregateMap, outputTableName);
-  }
-
-  @Override
-  protected Condition prepareMetricQueryCondition(long startTime, long 
endTime) {
-    Condition condition = new DefaultCondition(null, null, null, null, 
startTime,
-      endTime, null, null, true);
-    condition.setNoLimit();
-    condition.setFetchSize(resultsetFetchSize);
-    condition.setStatement(String.format(GET_METRIC_AGGREGATE_ONLY_SQL, 
tableName));
-    // Retaining order of the row-key avoids client side merge sort.
-    condition.addOrderByColumn("UUID");
-    condition.addOrderByColumn("SERVER_TIME");
-    return condition;
-  }
-
-  private Map<TimelineMetric, MetricHostAggregate> 
aggregateMetricsFromResultSet(ResultSet rs, long endTime)
-      throws IOException, SQLException {
-    TimelineMetric existingMetric = null;
-    MetricHostAggregate hostAggregate = null;
-    Map<TimelineMetric, MetricHostAggregate> hostAggregateMap =  new 
HashMap<TimelineMetric, MetricHostAggregate>();
-
-
-    while (rs.next()) {
-      TimelineMetric currentMetric =
-        readHelper.getTimelineMetricKeyFromResultSet(rs);
-      MetricHostAggregate currentHostAggregate =
-        readHelper.getMetricHostAggregateFromResultSet(rs);
-
-      if (existingMetric == null) {
-        // First row
-        existingMetric = currentMetric;
-        currentMetric.setStartTime(endTime);
-        hostAggregate = new MetricHostAggregate();
-        hostAggregateMap.put(currentMetric, hostAggregate);
-      }
-
-      if (existingMetric.equalsExceptTime(currentMetric)) {
-        // Recalculate totals with current metric
-        hostAggregate.updateAggregates(currentHostAggregate);
-      } else {
-        // Switched over to a new metric - save existing - create new aggregate
-        currentMetric.setStartTime(endTime);
-        hostAggregate = new MetricHostAggregate();
-        hostAggregate.updateAggregates(currentHostAggregate);
-        hostAggregateMap.put(currentMetric, hostAggregate);
-        existingMetric = currentMetric;
-      }
-    }
-    return hostAggregateMap;
-  }
-
-
-}

http://git-wip-us.apache.org/repos/asf/ambari/blob/42112e28/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricReadHelper.java
----------------------------------------------------------------------
diff --git 
a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricReadHelper.java
 
b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricReadHelper.java
deleted file mode 100644
index 5d31b51..0000000
--- 
a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricReadHelper.java
+++ /dev/null
@@ -1,170 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators;
-
-
-import java.io.IOException;
-import java.sql.ResultSet;
-import java.sql.SQLException;
-import java.util.TreeMap;
-
-import org.apache.hadoop.metrics2.sink.timeline.MetricClusterAggregate;
-import org.apache.hadoop.metrics2.sink.timeline.MetricHostAggregate;
-import org.apache.hadoop.metrics2.sink.timeline.SingleValuedTimelineMetric;
-import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
-import 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixHBaseAccessor;
-import 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.discovery.TimelineMetricMetadataManager;
-
-public class TimelineMetricReadHelper {
-
-  private boolean ignoreInstance = false;
-  private TimelineMetricMetadataManager metadataManagerInstance = null;
-
-  public TimelineMetricReadHelper() {}
-
-  public TimelineMetricReadHelper(boolean ignoreInstance) {
-    this.ignoreInstance = ignoreInstance;
-  }
-
-  public TimelineMetricReadHelper(TimelineMetricMetadataManager 
timelineMetricMetadataManager) {
-    this.metadataManagerInstance = timelineMetricMetadataManager;
-  }
-
-  public TimelineMetricReadHelper(TimelineMetricMetadataManager 
timelineMetricMetadataManager, boolean ignoreInstance) {
-    this.metadataManagerInstance = timelineMetricMetadataManager;
-    this.ignoreInstance = ignoreInstance;
-  }
-
-  public TimelineMetric getTimelineMetricFromResultSet(ResultSet rs)
-      throws SQLException, IOException {
-    TimelineMetric metric = getTimelineMetricCommonsFromResultSet(rs);
-    TreeMap<Long, Double> sortedByTimeMetrics = 
PhoenixHBaseAccessor.readMetricFromJSON(rs.getString("METRICS"));
-    metric.setMetricValues(sortedByTimeMetrics);
-    return metric;
-  }
-
-  public SingleValuedTimelineMetric 
getAggregatedTimelineMetricFromResultSet(ResultSet rs,
-      Function f) throws SQLException, IOException {
-
-    byte[] uuid = rs.getBytes("UUID");
-    TimelineMetric timelineMetric = 
metadataManagerInstance.getMetricFromUuid(uuid);
-    Function function = (f != null) ? f : Function.DEFAULT_VALUE_FUNCTION;
-    SingleValuedTimelineMetric metric = new SingleValuedTimelineMetric(
-      timelineMetric.getMetricName() + function.getSuffix(),
-      timelineMetric.getAppId(),
-      timelineMetric.getInstanceId(),
-      timelineMetric.getHostName(),
-      rs.getLong("SERVER_TIME")
-    );
-
-    double value;
-    switch(function.getReadFunction()){
-      case AVG:
-        value = rs.getDouble("METRIC_SUM") / rs.getInt("METRIC_COUNT");
-        break;
-      case MIN:
-        value = rs.getDouble("METRIC_MIN");
-        break;
-      case MAX:
-        value = rs.getDouble("METRIC_MAX");
-        break;
-      case SUM:
-        value = rs.getDouble("METRIC_SUM") / rs.getInt("METRIC_COUNT");
-        break;
-      default:
-        value = rs.getDouble("METRIC_SUM") / rs.getInt("METRIC_COUNT");
-        break;
-    }
-
-    metric.setSingleTimeseriesValue(rs.getLong("SERVER_TIME"), value);
-
-    return metric;
-  }
-
-  /**
-   * Returns common part of timeline metrics record without the values.
-   */
-  public TimelineMetric getTimelineMetricCommonsFromResultSet(ResultSet rs)
-      throws SQLException {
-
-    byte[] uuid = rs.getBytes("UUID");
-    TimelineMetric metric = metadataManagerInstance.getMetricFromUuid(uuid);
-    if (ignoreInstance) {
-      metric.setInstanceId(null);
-    }
-    metric.setStartTime(rs.getLong("SERVER_TIME"));
-    return metric;
-  }
-
-  public MetricClusterAggregate 
getMetricClusterAggregateFromResultSet(ResultSet rs)
-      throws SQLException {
-    MetricClusterAggregate agg = new MetricClusterAggregate();
-    agg.setSum(rs.getDouble("METRIC_SUM"));
-    agg.setMax(rs.getDouble("METRIC_MAX"));
-    agg.setMin(rs.getDouble("METRIC_MIN"));
-    agg.setNumberOfHosts(rs.getInt("HOSTS_COUNT"));
-
-    agg.setDeviation(0.0);
-
-    return agg;
-  }
-
-  public MetricClusterAggregate 
getMetricClusterTimeAggregateFromResultSet(ResultSet rs)
-      throws SQLException {
-    MetricClusterAggregate agg = new MetricClusterAggregate();
-    agg.setSum(rs.getDouble("METRIC_SUM"));
-    agg.setMax(rs.getDouble("METRIC_MAX"));
-    agg.setMin(rs.getDouble("METRIC_MIN"));
-    agg.setNumberOfHosts(rs.getInt("METRIC_COUNT"));
-
-    agg.setDeviation(0.0);
-
-    return agg;
-  }
-
-  public TimelineClusterMetric fromResultSet(ResultSet rs) throws SQLException 
{
-
-    byte[] uuid = rs.getBytes("UUID");
-    TimelineMetric timelineMetric = 
metadataManagerInstance.getMetricFromUuid(uuid);
-
-    return new TimelineClusterMetric(
-      timelineMetric.getMetricName(),
-      timelineMetric.getAppId(),
-      ignoreInstance ? null : timelineMetric.getInstanceId(),
-      rs.getLong("SERVER_TIME"));
-  }
-
-  public MetricHostAggregate getMetricHostAggregateFromResultSet(ResultSet rs)
-      throws SQLException {
-    MetricHostAggregate metricHostAggregate = new MetricHostAggregate();
-    metricHostAggregate.setSum(rs.getDouble("METRIC_SUM"));
-    metricHostAggregate.setMax(rs.getDouble("METRIC_MAX"));
-    metricHostAggregate.setMin(rs.getDouble("METRIC_MIN"));
-    metricHostAggregate.setNumberOfSamples(rs.getLong("METRIC_COUNT"));
-
-    metricHostAggregate.setDeviation(0.0);
-    return metricHostAggregate;
-  }
-
-  public TimelineMetric getTimelineMetricKeyFromResultSet(ResultSet rs)
-      throws SQLException, IOException {
-    byte[] uuid = rs.getBytes("UUID");
-    return metadataManagerInstance.getMetricFromUuid(uuid);
-  }
-}
-

http://git-wip-us.apache.org/repos/asf/ambari/blob/42112e28/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TopNDownSampler.java
----------------------------------------------------------------------
diff --git 
a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TopNDownSampler.java
 
b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TopNDownSampler.java
deleted file mode 100644
index d55d026..0000000
--- 
a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TopNDownSampler.java
+++ /dev/null
@@ -1,112 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators;
-
-import static 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.TOPN_DOWNSAMPLER_CLUSTER_METRIC_SELECT_SQL;
-import static 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.TOPN_DOWNSAMPLER_HOST_METRIC_SELECT_SQL;
-
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.commons.lang3.StringUtils;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.metrics2.sink.timeline.TopNConfig;
-import 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.TopNCondition;
-
-public class TopNDownSampler implements CustomDownSampler {
-
-  private TopNConfig topNConfig;
-  private static final Log LOG = LogFactory.getLog(TopNDownSampler.class);
-  protected String metricPatterns;
-
-  public static TopNDownSampler fromConfig(Map<String, String> conf) {
-    String metricPatterns = conf.get(DownSamplerUtils.downSamplerConfigPrefix 
+ "topn." +
-      DownSamplerUtils.downSamplerMetricPatternsConfig);
-
-    String topNString = conf.get(DownSamplerUtils.downSamplerConfigPrefix + 
"topn.value");
-    Integer topNValue = topNString != null ? Integer.valueOf(topNString) : 10;
-    String topNFunction = conf.get(DownSamplerUtils.downSamplerConfigPrefix + 
"topn.function");
-
-    return new TopNDownSampler(new TopNConfig(topNValue, topNFunction, false), 
metricPatterns);
-  }
-
-  public TopNDownSampler(TopNConfig topNConfig, String metricPatterns) {
-    this.topNConfig = topNConfig;
-    this.metricPatterns = metricPatterns;
-  }
-
-  @Override
-  public boolean validateConfigs() {
-    if (topNConfig == null) {
-      return false;
-    }
-
-    if (topNConfig.getTopN() <= 0) {
-      return false;
-    }
-
-    if (StringUtils.isEmpty(metricPatterns)) {
-      return false;
-    }
-
-    return true;
-  }
-
-  /**
-   * Prepare downsampling SELECT statement(s) used to determine the data to be 
written into the Aggregate table.
-   * @param startTime
-   * @param endTime
-   * @param tableName
-   * @return
-   */
-  @Override
-  public List<String> prepareDownSamplingStatement(Long startTime, Long 
endTime, String tableName) {
-    List<String> stmts = new ArrayList<>();
-
-    Function.ReadFunction readFunction = 
Function.ReadFunction.getFunction(topNConfig.getTopNFunction());
-    Function function = new Function(readFunction, null);
-    String columnSelect = TopNCondition.getColumnSelect(function);
-
-    List<String> metricPatternList = Arrays.asList(metricPatterns.split(","));
-
-    for (String metricPattern : metricPatternList) {
-      String metricPatternClause = "'" + metricPattern + "'";
-      //TODO : Need a better way to find out what kind of aggregation the 
current one is.
-      if (tableName.contains("RECORD")) {
-        stmts.add(String.format(TOPN_DOWNSAMPLER_HOST_METRIC_SELECT_SQL,
-          endTime, columnSelect, columnSelect, columnSelect, tableName, 
metricPatternClause,
-          startTime, endTime, columnSelect, topNConfig.getTopN()));
-      } else {
-        stmts.add(String.format(TOPN_DOWNSAMPLER_CLUSTER_METRIC_SELECT_SQL,
-          endTime, columnSelect, columnSelect, columnSelect, tableName, 
metricPatternClause,
-          startTime, endTime, columnSelect, topNConfig.getTopN()));
-      }
-    }
-
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("DownSampling Stmt: " + stmts.toString());
-    }
-
-    return stmts;
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/ambari/blob/42112e28/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/v2/TimelineMetricClusterAggregator.java
----------------------------------------------------------------------
diff --git 
a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/v2/TimelineMetricClusterAggregator.java
 
b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/v2/TimelineMetricClusterAggregator.java
deleted file mode 100644
index 06552a6..0000000
--- 
a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/v2/TimelineMetricClusterAggregator.java
+++ /dev/null
@@ -1,93 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.v2;
-
-import static 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.GET_AGGREGATED_APP_METRIC_GROUPBY_SQL;
-import static 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.METRICS_CLUSTER_AGGREGATE_TABLE_NAME;
-
-import java.io.IOException;
-import java.sql.ResultSet;
-import java.sql.SQLException;
-import java.util.Date;
-
-import org.apache.hadoop.conf.Configuration;
-import 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixHBaseAccessor;
-import 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.AbstractTimelineAggregator;
-import 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.AggregationTaskRunner.AGGREGATOR_NAME;
-import 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.MetricCollectorHAController;
-import 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.Condition;
-import 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.EmptyCondition;
-
-public class TimelineMetricClusterAggregator extends 
AbstractTimelineAggregator {
-  private final String aggregateColumnName;
-
-  public TimelineMetricClusterAggregator(AGGREGATOR_NAME aggregatorName,
-                                         PhoenixHBaseAccessor hBaseAccessor,
-                                         Configuration metricsConf,
-                                         String checkpointLocation,
-                                         Long sleepIntervalMillis,
-                                         Integer checkpointCutOffMultiplier,
-                                         String hostAggregatorDisabledParam,
-                                         String inputTableName,
-                                         String outputTableName,
-                                         Long nativeTimeRangeDelay,
-                                         MetricCollectorHAController 
haController) {
-    super(aggregatorName, hBaseAccessor, metricsConf, checkpointLocation,
-      sleepIntervalMillis, checkpointCutOffMultiplier,
-      hostAggregatorDisabledParam, inputTableName, outputTableName,
-      nativeTimeRangeDelay, haController);
-
-    if (inputTableName.equals(METRICS_CLUSTER_AGGREGATE_TABLE_NAME)) {
-      aggregateColumnName = "HOSTS_COUNT";
-    } else {
-      aggregateColumnName = "METRIC_COUNT";
-    }
-  }
-
-  @Override
-  protected Condition prepareMetricQueryCondition(long startTime, long 
endTime) {
-    EmptyCondition condition = new EmptyCondition();
-    condition.setDoUpdate(true);
-
-    /*
-    UPSERT INTO METRIC_AGGREGATE_HOURLY (METRIC_NAME, APP_ID, INSTANCE_ID,
-    SERVER_TIME, UNITS, METRIC_SUM, METRIC_COUNT, METRIC_MAX, METRIC_MIN)
-    SELECT METRIC_NAME, APP_ID, INSTANCE_ID, MAX(SERVER_TIME), UNITS,
-    SUM(METRIC_SUM), SUM(HOSTS_COUNT), MAX(METRIC_MAX), MIN(METRIC_MIN)
-    FROM METRIC_AGGREGATE WHERE SERVER_TIME >= 1441155600000 AND
-    SERVER_TIME < 1441159200000 GROUP BY METRIC_NAME, APP_ID, INSTANCE_ID, 
UNITS;
-     */
-
-    condition.setStatement(String.format(GET_AGGREGATED_APP_METRIC_GROUPBY_SQL,
-      outputTableName, endTime, aggregateColumnName, tableName,
-      getDownsampledMetricSkipClause(), startTime, endTime));
-
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("Condition: " + condition.toString());
-    }
-
-    return condition;
-  }
-
-  @Override
-  protected void aggregate(ResultSet rs, long startTime, long endTime) throws 
IOException, SQLException {
-    LOG.info("Aggregated cluster metrics for " + outputTableName +
-      ", with startTime = " + new Date(startTime) +
-      ", endTime = " + new Date(endTime));
-  }
-}

http://git-wip-us.apache.org/repos/asf/ambari/blob/42112e28/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/v2/TimelineMetricFilteringHostAggregator.java
----------------------------------------------------------------------
diff --git 
a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/v2/TimelineMetricFilteringHostAggregator.java
 
b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/v2/TimelineMetricFilteringHostAggregator.java
deleted file mode 100644
index a15ab2e..0000000
--- 
a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/v2/TimelineMetricFilteringHostAggregator.java
+++ /dev/null
@@ -1,119 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.v2;
-
-import static 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.GET_AGGREGATED_HOST_METRIC_GROUPBY_SQL;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-
-import org.apache.commons.collections.CollectionUtils;
-import org.apache.hadoop.conf.Configuration;
-import 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixHBaseAccessor;
-import 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.AggregationTaskRunner;
-import 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.MetricCollectorHAController;
-import 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.discovery.TimelineMetricMetadataManager;
-import 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.Condition;
-import 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.EmptyCondition;
-
-public class TimelineMetricFilteringHostAggregator extends 
TimelineMetricHostAggregator {
-  private TimelineMetricMetadataManager metricMetadataManager;
-  private ConcurrentHashMap<String, Long> postedAggregatedMap;
-
-  public 
TimelineMetricFilteringHostAggregator(AggregationTaskRunner.AGGREGATOR_NAME 
aggregatorName,
-                                               TimelineMetricMetadataManager 
metricMetadataManager,
-                                               PhoenixHBaseAccessor 
hBaseAccessor,
-                                               Configuration metricsConf,
-                                               String checkpointLocation,
-                                               Long sleepIntervalMillis,
-                                               Integer 
checkpointCutOffMultiplier,
-                                               String 
hostAggregatorDisabledParam,
-                                               String tableName,
-                                               String outputTableName,
-                                               Long nativeTimeRangeDelay,
-                                               MetricCollectorHAController 
haController,
-                                               ConcurrentHashMap<String, Long> 
postedAggregatedMap) {
-    super(aggregatorName,
-      hBaseAccessor, metricsConf,
-      checkpointLocation,
-      sleepIntervalMillis,
-      checkpointCutOffMultiplier,
-      hostAggregatorDisabledParam,
-      tableName,
-      outputTableName,
-      nativeTimeRangeDelay,
-      haController);
-    this.metricMetadataManager = metricMetadataManager;
-    this.postedAggregatedMap = postedAggregatedMap;
-  }
-
-  @Override
-  protected Condition prepareMetricQueryCondition(long startTime, long 
endTime) {
-    List<String> aggregatedHostnames = new ArrayList<>();
-    for (Map.Entry<String, Long> entry : postedAggregatedMap.entrySet()) {
-      if (entry.getValue() > startTime && entry.getValue() <= endTime) {
-        aggregatedHostnames.add(entry.getKey());
-      }
-    }
-    List<String> notAggregatedHostnames = 
metricMetadataManager.getNotLikeHostnames(aggregatedHostnames);
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("Already aggregated hostnames based on postedAggregatedMap : " 
+ aggregatedHostnames);
-      LOG.debug("Hostnames that will be aggregated : " + 
notAggregatedHostnames);
-    }
-    List<byte[]> uuids = metricMetadataManager.getUuids(new 
ArrayList<String>(), notAggregatedHostnames, "", "");
-
-    EmptyCondition condition = new EmptyCondition();
-    condition.setDoUpdate(true);
-
-    
condition.setStatement(String.format(GET_AGGREGATED_HOST_METRIC_GROUPBY_SQL,
-      outputTableName, endTime, tableName,
-      getDownsampledMetricSkipClause() + getIncludedUuidsClause(uuids), 
startTime, endTime));
-
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("Condition: " + condition.toString());
-    }
-
-    return condition;
-  }
-
-  private String getIncludedUuidsClause(List<byte[]> uuids) {
-    StringBuilder sb = new StringBuilder();
-    sb.append("(");
-
-    //LIKE clause
-    // (UUID LIKE ? OR UUID LIKE ?) AND
-    if (CollectionUtils.isNotEmpty(uuids)) {
-      for (int i = 0; i < uuids.size(); i++) {
-        sb.append("UUID");
-        sb.append(" LIKE ");
-        sb.append("'%");
-        sb.append(new String(uuids.get(i)));
-        sb.append("'");
-
-        if (i == uuids.size() - 1) {
-          sb.append(") AND ");
-        } else {
-          sb.append(" OR ");
-        }
-      }
-    }
-    return sb.toString();
-  }
-}

http://git-wip-us.apache.org/repos/asf/ambari/blob/42112e28/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/v2/TimelineMetricHostAggregator.java
----------------------------------------------------------------------
diff --git 
a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/v2/TimelineMetricHostAggregator.java
 
b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/v2/TimelineMetricHostAggregator.java
deleted file mode 100644
index 3eb2be3..0000000
--- 
a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/v2/TimelineMetricHostAggregator.java
+++ /dev/null
@@ -1,76 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.v2;
-
-import static 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.GET_AGGREGATED_HOST_METRIC_GROUPBY_SQL;
-
-import java.io.IOException;
-import java.sql.ResultSet;
-import java.sql.SQLException;
-import java.util.Date;
-
-import org.apache.hadoop.conf.Configuration;
-import 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixHBaseAccessor;
-import 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.AbstractTimelineAggregator;
-import 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.AggregationTaskRunner.AGGREGATOR_NAME;
-import 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.MetricCollectorHAController;
-import 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.Condition;
-import 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.EmptyCondition;
-
-public class TimelineMetricHostAggregator extends AbstractTimelineAggregator {
-
-  public TimelineMetricHostAggregator(AGGREGATOR_NAME aggregatorName,
-                                      PhoenixHBaseAccessor hBaseAccessor,
-                                      Configuration metricsConf,
-                                      String checkpointLocation,
-                                      Long sleepIntervalMillis,
-                                      Integer checkpointCutOffMultiplier,
-                                      String hostAggregatorDisabledParam,
-                                      String tableName,
-                                      String outputTableName,
-                                      Long nativeTimeRangeDelay,
-                                      MetricCollectorHAController 
haController) {
-    super(aggregatorName, hBaseAccessor, metricsConf, checkpointLocation,
-      sleepIntervalMillis, checkpointCutOffMultiplier, 
hostAggregatorDisabledParam,
-      tableName, outputTableName, nativeTimeRangeDelay, haController);
-  }
-
-  @Override
-  protected void aggregate(ResultSet rs, long startTime, long endTime) throws 
IOException, SQLException {
-
-    LOG.info("Aggregated host metrics for " + outputTableName +
-      ", with startTime = " + new Date(startTime) +
-      ", endTime = " + new Date(endTime));
-  }
-
-  @Override
-  protected Condition prepareMetricQueryCondition(long startTime, long 
endTime) {
-    EmptyCondition condition = new EmptyCondition();
-    condition.setDoUpdate(true);
-
-    
condition.setStatement(String.format(GET_AGGREGATED_HOST_METRIC_GROUPBY_SQL,
-      outputTableName, endTime, tableName,
-      getDownsampledMetricSkipClause(), startTime, endTime));
-
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("Condition: " + condition.toString());
-    }
-
-    return condition;
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ambari/blob/42112e28/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/availability/AggregationTaskRunner.java
----------------------------------------------------------------------
diff --git 
a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/availability/AggregationTaskRunner.java
 
b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/availability/AggregationTaskRunner.java
deleted file mode 100644
index fef9dc9..0000000
--- 
a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/availability/AggregationTaskRunner.java
+++ /dev/null
@@ -1,145 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability;
-
-import static 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.TimelineMetricAggregator.AGGREGATOR_TYPE.CLUSTER;
-import static 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.TimelineMetricAggregator.AGGREGATOR_TYPE.HOST;
-import static 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.AggregationTaskRunner.AGGREGATOR_NAME.METRIC_AGGREGATE_DAILY;
-import static 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.AggregationTaskRunner.AGGREGATOR_NAME.METRIC_AGGREGATE_HOURLY;
-import static 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.AggregationTaskRunner.AGGREGATOR_NAME.METRIC_AGGREGATE_MINUTE;
-import static 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.AggregationTaskRunner.AGGREGATOR_NAME.METRIC_AGGREGATE_SECOND;
-import static 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.AggregationTaskRunner.AGGREGATOR_NAME.METRIC_RECORD_DAILY;
-import static 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.AggregationTaskRunner.AGGREGATOR_NAME.METRIC_RECORD_HOURLY;
-import static 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.AggregationTaskRunner.AGGREGATOR_NAME.METRIC_RECORD_MINUTE;
-import static 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.MetricCollectorHAController.DEFAULT_STATE_MODEL;
-import static 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.MetricCollectorHAController.METRIC_AGGREGATORS;
-
-import java.util.HashMap;
-import java.util.Map;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.TimelineMetricAggregator.AGGREGATOR_TYPE;
-import org.apache.helix.HelixManager;
-import org.apache.helix.HelixManagerFactory;
-import org.apache.helix.InstanceType;
-import org.apache.helix.participant.StateMachineEngine;
-
-public class AggregationTaskRunner {
-  private final String instanceName;
-  private final String zkAddress;
-  private final String clusterName;
-  private HelixManager manager;
-  private static final Log LOG = 
LogFactory.getLog(AggregationTaskRunner.class);
-  private CheckpointManager checkpointManager;
-  // Map partition name to an aggregator dimension
-  static final Map<String, AGGREGATOR_TYPE> PARTITION_AGGREGATION_TYPES = new 
HashMap<>();
-  // Ownership flags to be set by the State transitions
-  private final AtomicBoolean performsClusterAggregation = new 
AtomicBoolean(false);
-  private final AtomicBoolean performsHostAggregation = new 
AtomicBoolean(false);
-
-  public enum AGGREGATOR_NAME {
-    METRIC_RECORD_MINUTE,
-    METRIC_RECORD_HOURLY,
-    METRIC_RECORD_DAILY,
-    METRIC_AGGREGATE_SECOND,
-    METRIC_AGGREGATE_MINUTE,
-    METRIC_AGGREGATE_HOURLY,
-    METRIC_AGGREGATE_DAILY,
-  }
-
-  public static final Map<AGGREGATOR_NAME, String> ACTUAL_AGGREGATOR_NAMES = 
new HashMap<>();
-
-  static {
-    ACTUAL_AGGREGATOR_NAMES.put(METRIC_RECORD_MINUTE, 
"TimelineMetricHostAggregatorMinute");
-    ACTUAL_AGGREGATOR_NAMES.put(METRIC_RECORD_HOURLY, 
"TimelineMetricHostAggregatorHourly");
-    ACTUAL_AGGREGATOR_NAMES.put(METRIC_RECORD_DAILY, 
"TimelineMetricHostAggregatorDaily");
-    ACTUAL_AGGREGATOR_NAMES.put(METRIC_AGGREGATE_SECOND, 
"TimelineClusterAggregatorSecond");
-    ACTUAL_AGGREGATOR_NAMES.put(METRIC_AGGREGATE_MINUTE, 
"TimelineClusterAggregatorMinute");
-    ACTUAL_AGGREGATOR_NAMES.put(METRIC_AGGREGATE_HOURLY, 
"TimelineClusterAggregatorHourly");
-    ACTUAL_AGGREGATOR_NAMES.put(METRIC_AGGREGATE_DAILY, 
"TimelineClusterAggregatorDaily");
-
-    // Partition name to task assignment
-    PARTITION_AGGREGATION_TYPES.put(METRIC_AGGREGATORS + "_0", CLUSTER);
-    PARTITION_AGGREGATION_TYPES.put(METRIC_AGGREGATORS + "_1", HOST);
-  }
-
-  public AggregationTaskRunner(String instanceName, String zkAddress, String 
clusterName) {
-    this.instanceName = instanceName;
-    this.zkAddress = zkAddress;
-    this.clusterName = clusterName;
-  }
-
-  public void initialize() throws Exception {
-    manager = HelixManagerFactory.getZKHelixManager(clusterName, instanceName,
-      InstanceType.PARTICIPANT, zkAddress);
-
-    OnlineOfflineStateModelFactory stateModelFactory =
-      new OnlineOfflineStateModelFactory(instanceName, this);
-
-    StateMachineEngine stateMach = manager.getStateMachineEngine();
-    stateMach.registerStateModelFactory(DEFAULT_STATE_MODEL, 
stateModelFactory);
-    manager.connect();
-
-    checkpointManager = new CheckpointManager(manager.getHelixPropertyStore());
-  }
-
-  public boolean performsClusterAggregation() {
-    return performsClusterAggregation.get();
-  }
-
-  public boolean performsHostAggregation() {
-    return performsHostAggregation.get();
-  }
-
-  public CheckpointManager getCheckpointManager() {
-    return checkpointManager;
-  }
-
-  public void setPartitionAggregationFunction(AGGREGATOR_TYPE type) {
-    switch (type) {
-      case HOST:
-        performsHostAggregation.set(true);
-        LOG.info("Set host aggregator function for : " + instanceName);
-        break;
-      case CLUSTER:
-        performsClusterAggregation.set(true);
-        LOG.info("Set cluster aggregator function for : " + instanceName);
-    }
-  }
-
-  public void unsetPartitionAggregationFunction(AGGREGATOR_TYPE type) {
-    switch (type) {
-      case HOST:
-        performsHostAggregation.set(false);
-        LOG.info("Unset host aggregator function for : " + instanceName);
-        break;
-      case CLUSTER:
-        performsClusterAggregation.set(false);
-        LOG.info("Unset cluster aggregator function for : " + instanceName);
-    }
-  }
-
-  /**
-   * Disconnect participant before controller shutdown
-   */
-  void stop() {
-    manager.disconnect();
-  }
-}

http://git-wip-us.apache.org/repos/asf/ambari/blob/42112e28/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/availability/CheckpointManager.java
----------------------------------------------------------------------
diff --git 
a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/availability/CheckpointManager.java
 
b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/availability/CheckpointManager.java
deleted file mode 100644
index 3293ead..0000000
--- 
a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/availability/CheckpointManager.java
+++ /dev/null
@@ -1,98 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability;
-
-import static 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.AggregationTaskRunner.ACTUAL_AGGREGATOR_NAMES;
-
-import org.I0Itec.zkclient.DataUpdater;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.AggregationTaskRunner.AGGREGATOR_NAME;
-import org.apache.helix.AccessOption;
-import org.apache.helix.ZNRecord;
-import org.apache.helix.store.zk.ZkHelixPropertyStore;
-import org.apache.zookeeper.data.Stat;
-
-public class CheckpointManager {
-  private final ZkHelixPropertyStore<ZNRecord> propertyStore;
-  private static final Log LOG = LogFactory.getLog(CheckpointManager.class);
-
-  static final String ZNODE_FIELD = "checkpoint";
-  static final String CHECKPOINT_PATH_PREFIX = "CHECKPOINTS";
-
-  public CheckpointManager(ZkHelixPropertyStore<ZNRecord> propertyStore) {
-    this.propertyStore = propertyStore;
-  }
-
-  /**
-   * Read aggregator checkpoint from zookeeper
-   *
-   * @return timestamp
-   */
-  public long readCheckpoint(AGGREGATOR_NAME aggregatorName) {
-    String path = getCheckpointZKPath(aggregatorName);
-    LOG.debug("Reading checkpoint at " + path);
-    Stat stat = new Stat();
-    ZNRecord znRecord = propertyStore.get(path, stat, AccessOption.PERSISTENT);
-    if (LOG.isTraceEnabled()) {
-      LOG.trace("Stat => " + stat);
-    }
-    long checkpoint = znRecord != null ? znRecord.getLongField(ZNODE_FIELD, 
-1) : -1;
-    LOG.debug("Checkpoint value = " + checkpoint);
-    return checkpoint;
-  }
-
-  /**
-   * Write aggregator checkpoint in zookeeper
-   *
-   * @param value timestamp
-   * @return sucsess
-   */
-  public boolean writeCheckpoint(AGGREGATOR_NAME aggregatorName, long value) {
-    String path = getCheckpointZKPath(aggregatorName);
-    LOG.debug(String.format("Saving checkpoint at %s with value %s", path, 
value));
-    return propertyStore.update(path, new CheckpointDataUpdater(path, value), 
AccessOption.PERSISTENT);
-  }
-
-  static class CheckpointDataUpdater implements DataUpdater<ZNRecord> {
-    final String path;
-    final Long value;
-
-    public CheckpointDataUpdater(String path, Long value) {
-      this.path = path;
-      this.value = value;
-    }
-
-    @Override
-    public ZNRecord update(ZNRecord currentData) {
-      if (currentData == null) {
-        currentData = new ZNRecord(path);
-      }
-      currentData.setLongField(ZNODE_FIELD, value);
-      return currentData;
-    }
-  }
-
-  String getCheckpointZKPath(AGGREGATOR_NAME aggregatorName) {
-    StringBuilder sb = new StringBuilder("/");
-    sb.append(CHECKPOINT_PATH_PREFIX);
-    sb.append("/");
-    sb.append(ACTUAL_AGGREGATOR_NAMES.get(aggregatorName));
-    return sb.toString();
-  }
-}

Reply via email to