http://git-wip-us.apache.org/repos/asf/ambari/blob/42112e28/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/AbstractTimelineAggregator.java
----------------------------------------------------------------------
diff --git 
a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/AbstractTimelineAggregator.java
 
b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/AbstractTimelineAggregator.java
deleted file mode 100644
index 9cfba6e..0000000
--- 
a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/AbstractTimelineAggregator.java
+++ /dev/null
@@ -1,495 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators;
-
-import static java.util.concurrent.TimeUnit.SECONDS;
-import static 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.AGGREGATOR_CHECKPOINT_DELAY;
-import static 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.RESULTSET_FETCH_SIZE;
-import static 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.AggregatorUtils.getRoundedAggregateTimeMillis;
-import static 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.AggregatorUtils.getRoundedCheckPointTimeMillis;
-import static 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.AggregationTaskRunner.ACTUAL_AGGREGATOR_NAMES;
-
-import java.io.File;
-import java.io.IOException;
-import java.sql.Connection;
-import java.sql.PreparedStatement;
-import java.sql.ResultSet;
-import java.sql.SQLException;
-import java.util.Date;
-import java.util.Iterator;
-import java.util.List;
-
-import org.apache.commons.collections.CollectionUtils;
-import org.apache.commons.io.FileUtils;
-import org.apache.commons.lang.StringUtils;
-import org.apache.hadoop.conf.Configuration;
-import 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixHBaseAccessor;
-import 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.AggregationTaskRunner;
-import 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.AggregationTaskRunner.AGGREGATOR_NAME;
-import 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.MetricCollectorHAController;
-import 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.Condition;
-import 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.EmptyCondition;
-import 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Base class for all runnable aggregators. Provides common functions like
- * check pointing and scheduling.
- */
-public abstract class AbstractTimelineAggregator implements 
TimelineMetricAggregator {
-  protected final PhoenixHBaseAccessor hBaseAccessor;
-  protected final Logger LOG;
-  protected final long checkpointDelayMillis;
-  protected final Integer resultsetFetchSize;
-  protected Configuration metricsConf;
-  private String checkpointLocation;
-  private Long sleepIntervalMillis;
-  private Integer checkpointCutOffMultiplier;
-  private String aggregatorDisableParam;
-  protected String tableName;
-  protected String outputTableName;
-  protected Long nativeTimeRangeDelay;
-  protected AggregationTaskRunner taskRunner;
-  protected List<String> downsampleMetricPatterns;
-  protected List<CustomDownSampler> configuredDownSamplers;
-
-  // Explicitly name aggregators for logging needs
-  private final AGGREGATOR_NAME aggregatorName;
-
-  AbstractTimelineAggregator(AGGREGATOR_NAME aggregatorName,
-                             PhoenixHBaseAccessor hBaseAccessor,
-                             Configuration metricsConf) {
-    this.aggregatorName = aggregatorName;
-    this.hBaseAccessor = hBaseAccessor;
-    this.metricsConf = metricsConf;
-    this.checkpointDelayMillis = 
SECONDS.toMillis(metricsConf.getInt(AGGREGATOR_CHECKPOINT_DELAY, 120));
-    this.resultsetFetchSize = metricsConf.getInt(RESULTSET_FETCH_SIZE, 2000);
-    this.LOG = 
LoggerFactory.getLogger(ACTUAL_AGGREGATOR_NAMES.get(aggregatorName));
-    this.configuredDownSamplers = 
DownSamplerUtils.getDownSamplers(metricsConf);
-    this.downsampleMetricPatterns = 
DownSamplerUtils.getDownsampleMetricPatterns(metricsConf);
-  }
-
-  public AbstractTimelineAggregator(AGGREGATOR_NAME aggregatorName,
-                                    PhoenixHBaseAccessor hBaseAccessor,
-                                    Configuration metricsConf,
-                                    String checkpointLocation,
-                                    Long sleepIntervalMillis,
-                                    Integer checkpointCutOffMultiplier,
-                                    String aggregatorDisableParam,
-                                    String tableName,
-                                    String outputTableName,
-                                    Long nativeTimeRangeDelay,
-                                    MetricCollectorHAController haController) {
-    this(aggregatorName, hBaseAccessor, metricsConf);
-    this.checkpointLocation = checkpointLocation;
-    this.sleepIntervalMillis = sleepIntervalMillis;
-    this.checkpointCutOffMultiplier = checkpointCutOffMultiplier;
-    this.aggregatorDisableParam = aggregatorDisableParam;
-    this.tableName = tableName;
-    this.outputTableName = outputTableName;
-    this.nativeTimeRangeDelay = nativeTimeRangeDelay;
-    this.taskRunner = haController != null && haController.isInitialized() ?
-      haController.getAggregationTaskRunner() : null;
-  }
-
-  @Override
-  public void run() {
-    LOG.info("Started Timeline aggregator thread @ " + new Date());
-    Long SLEEP_INTERVAL = getSleepIntervalMillis();
-    runOnce(SLEEP_INTERVAL);
-  }
-
-  /**
-   * Access relaxed for tests
-   */
-  public void runOnce(Long SLEEP_INTERVAL) {
-    boolean performAggregationFunction = true;
-    if (taskRunner != null) {
-      switch (getAggregatorType()) {
-        case HOST:
-          performAggregationFunction = taskRunner.performsHostAggregation();
-          break;
-        case CLUSTER:
-          performAggregationFunction = taskRunner.performsClusterAggregation();
-      }
-    }
-
-    if (performAggregationFunction) {
-      long currentTime = System.currentTimeMillis();
-      long lastCheckPointTime = 
readLastCheckpointSavingOnFirstRun(currentTime);
-
-      if (lastCheckPointTime != -1) {
-        LOG.info("Last check point time: " + lastCheckPointTime + ", lagBy: "
-          + ((currentTime - lastCheckPointTime) / 1000)
-          + " seconds.");
-
-        boolean success = doWork(lastCheckPointTime, lastCheckPointTime + 
SLEEP_INTERVAL);
-
-        if (success) {
-          try {
-            saveCheckPoint(lastCheckPointTime + SLEEP_INTERVAL);
-          } catch (IOException io) {
-            LOG.warn("Error saving checkpoint, restarting aggregation at " +
-              "previous checkpoint.");
-          }
-        }
-      }
-    } else {
-      LOG.info("Skipping aggregation function not owned by this instance.");
-    }
-  }
-
-  private long readLastCheckpointSavingOnFirstRun(long currentTime) {
-    long lastCheckPointTime = -1;
-
-    try {
-      lastCheckPointTime = readCheckPoint();
-      if (lastCheckPointTime != -1) {
-        LOG.info("Last Checkpoint read : " + new Date(lastCheckPointTime));
-        if (isLastCheckPointTooOld(currentTime, lastCheckPointTime)) {
-          LOG.warn("Last Checkpoint is too old, discarding last checkpoint. " +
-            "lastCheckPointTime = " + new Date(lastCheckPointTime));
-          lastCheckPointTime = 
getRoundedAggregateTimeMillis(getSleepIntervalMillis()) - 
getSleepIntervalMillis();
-          LOG.info("Saving checkpoint time. " + new 
Date((lastCheckPointTime)));
-          saveCheckPoint(lastCheckPointTime);
-
-        } else {
-
-          if (lastCheckPointTime > 0) {
-            lastCheckPointTime = 
getRoundedCheckPointTimeMillis(lastCheckPointTime, getSleepIntervalMillis());
-            LOG.info("Rounded off checkpoint : " + new 
Date(lastCheckPointTime));
-          }
-
-          if (isLastCheckPointTooYoung(lastCheckPointTime)) {
-            LOG.info("Last checkpoint too recent for aggregation. Sleeping for 
1 cycle.");
-            return -1; //Skip Aggregation this time around
-          }
-        }
-      } else {
-        /*
-          No checkpoint. Save current rounded checkpoint and sleep for 1 cycle.
-         */
-        LOG.info("No checkpoint found");
-        long firstCheckPoint = 
getRoundedAggregateTimeMillis(getSleepIntervalMillis());
-        LOG.info("Saving checkpoint time. " + new Date((firstCheckPoint)));
-        saveCheckPoint(firstCheckPoint);
-      }
-    } catch (IOException io) {
-      LOG.warn("Unable to write last checkpoint time. Resuming sleep.", io);
-    }
-    return lastCheckPointTime;
-  }
-
-  private boolean isLastCheckPointTooOld(long currentTime, long checkpoint) {
-    // first checkpoint is saved checkpointDelayMillis in the past,
-    // so here we also need to take it into account
-    return checkpoint != -1 &&
-      ((currentTime - checkpoint) > getCheckpointCutOffIntervalMillis());
-  }
-
-  private boolean isLastCheckPointTooYoung(long checkpoint) {
-    return checkpoint != -1 &&
-      ((getRoundedAggregateTimeMillis(getSleepIntervalMillis()) <= 
checkpoint));
-  }
-
-  protected long readCheckPoint() {
-    if (taskRunner != null) {
-      return taskRunner.getCheckpointManager().readCheckpoint(aggregatorName);
-    }
-    try {
-      File checkpoint = new File(getCheckpointLocation());
-      if (checkpoint.exists()) {
-        String contents = FileUtils.readFileToString(checkpoint);
-        if (contents != null && !contents.isEmpty()) {
-          return Long.parseLong(contents);
-        }
-      }
-    } catch (IOException io) {
-      LOG.debug("", io);
-    }
-    return -1;
-  }
-
-  protected void saveCheckPoint(long checkpointTime) throws IOException {
-    if (taskRunner != null) {
-      boolean success = 
taskRunner.getCheckpointManager().writeCheckpoint(aggregatorName, 
checkpointTime);
-      if (!success) {
-        LOG.error("Error saving checkpoint with AggregationTaskRunner, " +
-          "aggregator = " + aggregatorName + "value = " + checkpointTime);
-      }
-    } else {
-      File checkpoint = new File(getCheckpointLocation());
-      if (!checkpoint.exists()) {
-        boolean done = checkpoint.createNewFile();
-        if (!done) {
-          throw new IOException("Could not create checkpoint at location, " +
-            getCheckpointLocation());
-        }
-      }
-      FileUtils.writeStringToFile(checkpoint, String.valueOf(checkpointTime));
-    }
-  }
-
-  /**
-   * Read metrics written during the time interval and save the sum and total
-   * in the aggregate table.
-   *
-   * @param startTime Sample start time
-   * @param endTime Sample end time
-   */
-  public boolean doWork(long startTime, long endTime) {
-    LOG.info("Start aggregation cycle @ " + new Date() + ", " +
-      "startTime = " + new Date(startTime) + ", endTime = " + new 
Date(endTime));
-
-    boolean success = true;
-    Condition condition = prepareMetricQueryCondition(startTime, endTime);
-
-    Connection conn = null;
-    PreparedStatement stmt = null;
-    ResultSet rs = null;
-
-    try {
-      conn = hBaseAccessor.getConnection();
-      // FLUME 2. aggregate and ignore the instance
-      stmt = PhoenixTransactSQL.prepareGetMetricsSqlStmt(conn, condition);
-
-      LOG.debug("Query issued @: " + new Date());
-      if (condition.doUpdate()) {
-        int rows = stmt.executeUpdate();
-        conn.commit();
-        LOG.info(rows + " row(s) updated in aggregation.");
-
-        //TODO : Fix downsampling after UUID change.
-        //downsample(conn, startTime, endTime);
-      } else {
-        rs = stmt.executeQuery();
-      }
-      LOG.debug("Query returned @: " + new Date());
-
-      aggregate(rs, startTime, endTime);
-
-    } catch (Exception e) {
-      LOG.error("Exception during aggregating metrics.", e);
-      success = false;
-    } finally {
-      if (rs != null) {
-        try {
-          rs.close();
-        } catch (SQLException e) {
-          // Ignore
-        }
-      }
-      if (stmt != null) {
-        try {
-          stmt.close();
-        } catch (SQLException e) {
-          // Ignore
-        }
-      }
-      if (conn != null) {
-        try {
-          conn.close();
-        } catch (SQLException sql) {
-          // Ignore
-        }
-      }
-    }
-
-    LOG.info("End aggregation cycle @ " + new Date());
-    return success;
-  }
-
-  protected abstract Condition prepareMetricQueryCondition(long startTime, 
long endTime);
-
-  protected abstract void aggregate(ResultSet rs, long startTime, long 
endTime) throws IOException, SQLException;
-
-  protected void downsample(Connection conn, Long startTime, Long endTime) {
-
-    LOG.debug("Checking for downsampling requests.");
-    if (CollectionUtils.isEmpty(configuredDownSamplers)) {
-      LOG.debug("No downsamplers configured");
-      return;
-    }
-
-    // Generate UPSERT query prefix. UPSERT part of the query is needed on the 
Aggregator side.
-    // SELECT part of the query is provided by the downsampler.
-    String queryPrefix = 
PhoenixTransactSQL.DOWNSAMPLE_CLUSTER_METRIC_SQL_UPSERT_PREFIX;
-    if (outputTableName.contains("RECORD")) {
-      queryPrefix = 
PhoenixTransactSQL.DOWNSAMPLE_HOST_METRIC_SQL_UPSERT_PREFIX;
-    }
-    queryPrefix = String.format(queryPrefix, outputTableName);
-
-    for (Iterator<CustomDownSampler> iterator = 
configuredDownSamplers.iterator(); iterator.hasNext();){
-      CustomDownSampler downSampler = iterator.next();
-
-      if (downSampler.validateConfigs()) {
-        EmptyCondition downSamplingCondition = new EmptyCondition();
-        downSamplingCondition.setDoUpdate(true);
-        List<String> stmts = 
downSampler.prepareDownSamplingStatement(startTime, endTime, tableName);
-        for (String stmt : stmts) {
-          downSamplingCondition.setStatement(queryPrefix + stmt);
-          runDownSamplerQuery(conn, downSamplingCondition);
-        }
-      } else {
-        LOG.warn("The following downsampler failed config validation : " + 
downSampler.getClass().getName() + "." +
-          "Removing it from downsamplers list.");
-        iterator.remove();
-      }
-    }
-
-  }
-
-  public Long getSleepIntervalMillis() {
-    return sleepIntervalMillis;
-  }
-
-  public void setSleepIntervalMillis(Long sleepIntervalMillis) {
-    this.sleepIntervalMillis = sleepIntervalMillis;
-  }
-
-  protected Integer getCheckpointCutOffMultiplier() {
-    return checkpointCutOffMultiplier;
-  }
-
-  protected Long getCheckpointCutOffIntervalMillis() {
-    return getCheckpointCutOffMultiplier() * getSleepIntervalMillis();
-  }
-
-  public boolean isDisabled() {
-    return metricsConf.getBoolean(aggregatorDisableParam, false);
-  }
-
-  protected String getQueryHint(Long startTime) {
-    StringBuilder sb = new StringBuilder();
-    sb.append("/*+ ");
-    sb.append("NATIVE_TIME_RANGE(");
-    sb.append(startTime - nativeTimeRangeDelay);
-    sb.append(") ");
-    if (hBaseAccessor.isSkipBlockCacheForAggregatorsEnabled()) {
-      sb.append("NO_CACHE ");
-    }
-    sb.append("*/");
-    return sb.toString();
-  }
-
-  protected String getCheckpointLocation() {
-    return checkpointLocation;
-  }
-
-  /**
-   * Run 1 downsampler query.
-   * @param conn
-   * @param condition
-   */
-  private void runDownSamplerQuery(Connection conn, Condition condition) {
-
-    PreparedStatement stmt = null;
-    ResultSet rs = null;
-    LOG.debug("Downsampling query : " + condition.getStatement());
-
-    try {
-      stmt = PhoenixTransactSQL.prepareGetMetricsSqlStmt(conn, condition);
-
-      LOG.debug("Downsampler Query issued...");
-      if (condition.doUpdate()) {
-        int rows = stmt.executeUpdate();
-        conn.commit();
-        LOG.info(rows + " row(s) updated in downsampling.");
-      } else {
-        rs = stmt.executeQuery();
-      }
-      LOG.debug("Downsampler Query returned ...");
-      LOG.info("End Downsampling cycle.");
-
-    } catch (SQLException e) {
-      LOG.error("Exception during downsampling metrics.", e);
-    } finally {
-      if (rs != null) {
-        try {
-          rs.close();
-        } catch (SQLException e) {
-          // Ignore
-        }
-      }
-      if (stmt != null) {
-        try {
-          stmt.close();
-        } catch (SQLException e) {
-          // Ignore
-        }
-      }
-      if (conn != null) {
-        try {
-          conn.close();
-        } catch (SQLException sql) {
-          // Ignore
-        }
-      }
-    }
-  }
-
-  /**
-   * Returns the METRIC_NAME NOT LIKE clause if certain metrics or metric 
patterns are to be skipped
-   * since they will be downsampled.
-   * @return
-   */
-  protected String getDownsampledMetricSkipClause() {
-
-    //TODO Fix downsampling for UUID change.
-    return StringUtils.EMPTY;
-
-//    if (CollectionUtils.isEmpty(this.downsampleMetricPatterns)) {
-//      return StringUtils.EMPTY;
-//    }
-//
-//    StringBuilder sb = new StringBuilder();
-//
-//    for (int i = 0; i < downsampleMetricPatterns.size(); i++) {
-//      sb.append(" METRIC_NAME");
-//      sb.append(" NOT");
-//      sb.append(" LIKE ");
-//      sb.append("'" + downsampleMetricPatterns.get(i) + "'");
-//
-//      if (i < downsampleMetricPatterns.size() - 1) {
-//        sb.append(" AND ");
-//      }
-//    }
-//
-//    sb.append(" AND ");
-//    return sb.toString();
-  }
-
-  /**
-   * Get @AGGREGATOR_TYPE based on the output table.
-   * This is solely used by the HAController to determine which lock to 
acquire.
-   */
-  public AGGREGATOR_TYPE getAggregatorType() {
-    if (outputTableName.contains("RECORD")) {
-      return AGGREGATOR_TYPE.HOST;
-    } else if (outputTableName.contains("AGGREGATE")) {
-      return AGGREGATOR_TYPE.CLUSTER;
-    }
-    return null;
-  }
-
-  @Override
-  public AGGREGATOR_NAME getName() {
-    return aggregatorName;
-  }
-}

http://git-wip-us.apache.org/repos/asf/ambari/blob/42112e28/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/AggregatorUtils.java
----------------------------------------------------------------------
diff --git 
a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/AggregatorUtils.java
 
b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/AggregatorUtils.java
deleted file mode 100644
index b8338fb..0000000
--- 
a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/AggregatorUtils.java
+++ /dev/null
@@ -1,254 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators;
-
-
-import java.util.ArrayList;
-import java.util.Date;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.TreeMap;
-
-import org.apache.commons.collections.MapUtils;
-import org.apache.commons.lang.StringUtils;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.metrics2.sink.timeline.PostProcessingUtil;
-import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
-
-/**
- *
- */
-public class AggregatorUtils {
-
-  private static final Log LOG = LogFactory.getLog(AggregatorUtils.class);
-
-  public static double[] calculateAggregates(Map<Long, Double> metricValues) {
-    double[] values = new double[4];
-    double max = Double.MIN_VALUE;
-    double min = Double.MAX_VALUE;
-    double sum = 0.0;
-    int metricCount = 0;
-
-    if (metricValues != null && !metricValues.isEmpty()) {
-      for (Double value : metricValues.values()) {
-        // TODO: Some nulls in data - need to investigate null values from host
-        if (value != null) {
-          if (value > max) {
-            max = value;
-          }
-          if (value < min) {
-            min = value;
-          }
-          sum += value;
-        }
-      }
-      metricCount = metricValues.values().size();
-    }
-    // BR: WHY ZERO is a good idea?
-    values[0] = sum;
-    values[1] = max != Double.MIN_VALUE ? max : 0.0;
-    values[2] = min != Double.MAX_VALUE ? min : 0.0;
-    values[3] = metricCount;
-
-    return values;
-  }
-
-  public static Map<TimelineClusterMetric, Double> sliceFromTimelineMetric(
-      TimelineMetric timelineMetric, List<Long[]> timeSlices, boolean 
interpolationEnabled) {
-
-    if (timelineMetric.getMetricValues().isEmpty()) {
-      return null;
-    }
-
-    Map<TimelineClusterMetric, Double> timelineClusterMetricMap =
-        new HashMap<>();
-
-    Long prevTimestamp = -1l;
-    TimelineClusterMetric prevMetric = null;
-    int count = 0;
-    double sum = 0.0;
-
-    Map<Long,Double> timeSliceValueMap = new HashMap<>();
-    for (Map.Entry<Long, Double> metric : 
timelineMetric.getMetricValues().entrySet()) {
-      if (metric.getValue() == null) {
-        continue;
-      }
-
-      Long timestamp = getSliceTimeForMetric(timeSlices, 
Long.parseLong(metric.getKey().toString()));
-      if (timestamp != -1) {
-        // Metric is within desired time range
-        TimelineClusterMetric clusterMetric = new TimelineClusterMetric(
-            timelineMetric.getMetricName(),
-            timelineMetric.getAppId(),
-            timelineMetric.getInstanceId(),
-            timestamp);
-
-        if (prevTimestamp < 0 || timestamp.equals(prevTimestamp)) {
-          Double newValue = metric.getValue();
-          if (newValue > 0.0) {
-            sum += newValue;
-            count++;
-          }
-        } else {
-          double metricValue = (count > 0) ? (sum / count) : 0.0;
-          timelineClusterMetricMap.put(prevMetric, metricValue);
-          timeSliceValueMap.put(prevMetric.getTimestamp(), metricValue);
-          sum = metric.getValue();
-          count = sum > 0.0 ? 1 : 0;
-        }
-
-        prevTimestamp = timestamp;
-        prevMetric = clusterMetric;
-      }
-    }
-
-    if (prevTimestamp > 0) {
-      double metricValue = (count > 0) ? (sum / count) : 0.0;
-      timelineClusterMetricMap.put(prevMetric, metricValue);
-      timeSliceValueMap.put(prevTimestamp, metricValue);
-    }
-
-    if (interpolationEnabled) {
-      Map<Long, Double> interpolatedValues = 
interpolateMissingPeriods(timelineMetric.getMetricValues(), timeSlices, 
timeSliceValueMap, timelineMetric.getType());
-      for (Map.Entry<Long, Double> entry : interpolatedValues.entrySet()) {
-        TimelineClusterMetric timelineClusterMetric = new 
TimelineClusterMetric(timelineMetric.getMetricName(), 
timelineMetric.getAppId(), timelineMetric.getInstanceId(), entry.getKey());
-        timelineClusterMetricMap.putIfAbsent(timelineClusterMetric, 
entry.getValue());
-      }
-    }
-
-    return timelineClusterMetricMap;
-  }
-
-  private static Map<Long, Double> interpolateMissingPeriods(TreeMap<Long, 
Double> metricValues,
-                                               List<Long[]> timeSlices,
-                                               Map<Long, Double> 
timeSliceValueMap, String type) {
-    Map<Long, Double> resultClusterMetricMap = new HashMap<>();
-
-    if (StringUtils.isNotEmpty(type) && "COUNTER".equalsIgnoreCase(type)) {
-      //For Counter Based metrics, ok to do interpolation and extrapolation
-
-      List<Long> requiredTimestamps = new ArrayList<>();
-      for (Long[] timeSlice : timeSlices) {
-        if (!timeSliceValueMap.containsKey(timeSlice[1])) {
-          requiredTimestamps.add(timeSlice[1]);
-        }
-      }
-      Map<Long, Double> interpolatedValuesMap = 
PostProcessingUtil.interpolate(metricValues, requiredTimestamps);
-
-      if (interpolatedValuesMap != null) {
-        for (Map.Entry<Long, Double> entry : interpolatedValuesMap.entrySet()) 
{
-          Double interpolatedValue = entry.getValue();
-
-          if (interpolatedValue != null) {
-            resultClusterMetricMap.put( entry.getKey(), interpolatedValue);
-          } else {
-            LOG.debug("Cannot compute interpolated value, hence skipping.");
-          }
-        }
-      }
-    } else {
-      //For other metrics, ok to do only interpolation
-
-      Double defaultNextSeenValue = null;
-      if (MapUtils.isEmpty(timeSliceValueMap) && 
MapUtils.isNotEmpty(metricValues)) {
-        //If no value was found within the start_time based slices, but the 
metric has value in the server_time range,
-        // use that.
-
-        Map.Entry<Long,Double> firstEntry  = metricValues.firstEntry();
-        defaultNextSeenValue = firstEntry.getValue();
-        LOG.debug("Found a data point outside timeslice range: " + new 
Date(firstEntry.getKey()) + ": " + defaultNextSeenValue);
-      }
-
-      for (int sliceNum = 0; sliceNum < timeSlices.size(); sliceNum++) {
-        Long[] timeSlice = timeSlices.get(sliceNum);
-
-        if (!timeSliceValueMap.containsKey(timeSlice[1])) {
-          LOG.debug("Found an empty slice : " + new Date(timeSlice[0]) + ", " 
+ new Date(timeSlice[1]));
-
-          Double lastSeenValue = null;
-          int index = sliceNum - 1;
-          Long[] prevTimeSlice = null;
-          while (lastSeenValue == null && index >= 0) {
-            prevTimeSlice = timeSlices.get(index--);
-            lastSeenValue = timeSliceValueMap.get(prevTimeSlice[1]);
-          }
-
-          Double nextSeenValue = null;
-          index = sliceNum + 1;
-          Long[] nextTimeSlice = null;
-          while (nextSeenValue == null && index < timeSlices.size()) {
-            nextTimeSlice = timeSlices.get(index++);
-            nextSeenValue = timeSliceValueMap.get(nextTimeSlice[1]);
-          }
-
-          if (nextSeenValue == null) {
-            nextSeenValue = defaultNextSeenValue;
-          }
-
-          Double interpolatedValue = 
PostProcessingUtil.interpolate(timeSlice[1],
-              (prevTimeSlice != null ? prevTimeSlice[1] : null), lastSeenValue,
-              (nextTimeSlice != null ? nextTimeSlice[1] : null), 
nextSeenValue);
-
-          if (interpolatedValue != null) {
-            LOG.debug("Interpolated value : " + interpolatedValue);
-            resultClusterMetricMap.put(timeSlice[1], interpolatedValue);
-          } else {
-            LOG.debug("Cannot compute interpolated value, hence skipping.");
-          }
-        }
-      }
-    }
-    return resultClusterMetricMap;
-  }
-
-  /**
-   * Return end of the time slice into which the metric fits.
-   */
-  public static Long getSliceTimeForMetric(List<Long[]> timeSlices, Long 
timestamp) {
-    for (Long[] timeSlice : timeSlices) {
-      if (timestamp >= timeSlice[0] && timestamp < timeSlice[1]) {
-        return timeSlice[1];
-      }
-    }
-    return -1l;
-  }
-
-  /**
-   * Return time slices to normalize the timeseries data.
-   */
-  public static  List<Long[]> getTimeSlices(long startTime, long endTime, long 
timeSliceIntervalMillis) {
-    List<Long[]> timeSlices = new ArrayList<Long[]>();
-    long sliceStartTime = startTime;
-    while (sliceStartTime < endTime) {
-      timeSlices.add(new Long[] { sliceStartTime, sliceStartTime + 
timeSliceIntervalMillis });
-      sliceStartTime += timeSliceIntervalMillis;
-    }
-    return timeSlices;
-  }
-
-  public static long getRoundedCheckPointTimeMillis(long referenceTime, long 
aggregatorPeriod) {
-    return referenceTime - (referenceTime % aggregatorPeriod);
-  }
-
-  public static long getRoundedAggregateTimeMillis(long aggregatorPeriod) {
-    long currentTime = System.currentTimeMillis();
-    return currentTime - (currentTime % aggregatorPeriod);
-  }
-}

http://git-wip-us.apache.org/repos/asf/ambari/blob/42112e28/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/CustomDownSampler.java
----------------------------------------------------------------------
diff --git 
a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/CustomDownSampler.java
 
b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/CustomDownSampler.java
deleted file mode 100644
index cfd82dd..0000000
--- 
a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/CustomDownSampler.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators;
-
-import java.util.List;
-
-/**
- * Interface to add a custom downsampler.
- * Each configured downsampler will be executed during an aggregation cycle.
- */
-public interface CustomDownSampler {
-
-  /**
-   * Gatekeeper to check the configs. If this fails, the downsampling is not 
done.
-   * @return
-   */
-  public boolean validateConfigs();
-
-  /**
-   * Return the set of statements that needs to be executed for the 
downsampling.
-   * @param startTime
-   * @param endTime
-   * @param tableName
-   * @return
-   */
-  public List<String> prepareDownSamplingStatement(Long startTime, Long 
endTime, String tableName);
-
-}

http://git-wip-us.apache.org/repos/asf/ambari/blob/42112e28/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/DownSamplerUtils.java
----------------------------------------------------------------------
diff --git 
a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/DownSamplerUtils.java
 
b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/DownSamplerUtils.java
deleted file mode 100644
index 23b1cb3..0000000
--- 
a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/DownSamplerUtils.java
+++ /dev/null
@@ -1,117 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
-import org.apache.commons.lang.StringUtils;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-
-/**
- * DownSampler utility class. Responsible for fetching downsampler configs 
from Metrics config, and determine if
- * any downsamplers are configured.
- */
-
-public class DownSamplerUtils {
-
-  public static final String downSamplerConfigPrefix = 
"timeline.metrics.downsampler.";
-  public static final String downSamplerMetricPatternsConfig = 
"metric.patterns";
-  public static final String topNDownSampler = "topn";
-  private static final Log LOG = LogFactory.getLog(DownSamplerUtils.class);
-
-
-
-  /**
-   * Get the list of metrics that are requested to be downsampled.
-   * @param configuration
-   * @return List of metric patterns/names that are to be downsampled.
-   */
-  public static List<String> getDownsampleMetricPatterns(Configuration 
configuration) {
-    Map<String, String> conf = 
configuration.getValByRegex(downSamplerConfigPrefix + "*");
-    List<String> metricPatterns = new ArrayList<>();
-    Set<String> keys = conf.keySet();
-    for (String key : keys) {
-      if (key.endsWith(downSamplerMetricPatternsConfig)) {
-        String patternString = conf.get(key);
-        String[] patterns = StringUtils.split(patternString, ",");
-        for (String pattern : patterns) {
-          if (StringUtils.isNotEmpty(pattern)) {
-            String trimmedPattern = pattern.trim();
-            metricPatterns.add(trimmedPattern);
-          }
-        }
-      }
-    }
-    return metricPatterns;
-  }
-
-  /**
-   * Get the list of downsamplers that are configured in ams-site
-   * Sample config
-   <name>timeline.metrics.downsampler.topn.metric.patterns</name>
-   
<value>dfs.NNTopUserOpCounts.windowMs=60000.op%,dfs.NNTopUserOpCounts.windowMs=300000.op%</value>
-
-   <name>timeline.metrics.downsampler.topn.value</name>
-   <value>10</value>
-
-   <name>timeline.metrics.downsampler.topn.function</name>
-   <value>max</value>
-   * @param configuration
-   * @return
-   */
-  public static List<CustomDownSampler> getDownSamplers(Configuration 
configuration) {
-
-    Map<String,String> conf = 
configuration.getValByRegex(downSamplerConfigPrefix + "*");
-    List<CustomDownSampler> downSamplers = new ArrayList<>();
-    Set<String> keys = conf.keySet();
-
-    try {
-      for (String key : keys) {
-        if (key.startsWith(downSamplerConfigPrefix) && 
key.endsWith(downSamplerMetricPatternsConfig)) {
-          String type = key.split("\\.")[3];
-          CustomDownSampler downSampler = getDownSamplerByType(type, conf);
-          if (downSampler != null) {
-            downSamplers.add(downSampler);
-          }
-        }
-      }
-    } catch (Exception e) {
-      LOG.warn("Exception caught while parsing downsampler configs from 
ams-site : " + e.getMessage());
-    }
-    return downSamplers;
-  }
-
-  public static CustomDownSampler getDownSamplerByType(String type, 
Map<String, String> conf) {
-    if (type == null) {
-      return null;
-    }
-
-    if (StringUtils.isNotEmpty(type) && 
type.equalsIgnoreCase(topNDownSampler)) {
-      return TopNDownSampler.fromConfig(conf);
-    }
-
-    LOG.warn("Unknown downsampler requested : " + type);
-    return null;
-  }
-}

http://git-wip-us.apache.org/repos/asf/ambari/blob/42112e28/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/Function.java
----------------------------------------------------------------------
diff --git 
a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/Function.java
 
b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/Function.java
deleted file mode 100644
index ab9d2e9..0000000
--- 
a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/Function.java
+++ /dev/null
@@ -1,212 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators;
-
-import java.util.Arrays;
-
-/**
- * Is used to determine metrics aggregate table.
- *
- * @see 
org.apache.hadoop.yarn.server.applicationhistoryservice.webapp.TimelineWebServices#getTimelineMetric
- * @see 
org.apache.hadoop.yarn.server.applicationhistoryservice.webapp.TimelineWebServices#getTimelineMetrics
- */
-public class Function {
-  public static Function DEFAULT_VALUE_FUNCTION = new 
Function(ReadFunction.VALUE, null);
-  private static final String SUFFIX_SEPARATOR = "\\._";
-
-  private ReadFunction readFunction = ReadFunction.VALUE;
-  private PostProcessingFunction postProcessingFunction = null;
-
-  public Function() {
-  }
-
-  public Function(ReadFunction readFunction,
-                  PostProcessingFunction ppFunction){
-    if (readFunction!=null){
-      this.readFunction = readFunction ;
-    }
-    this.postProcessingFunction = ppFunction;
-  }
-
-  /**
-   * Segregate post processing function eg: rate from aggregate function,
-   * example: avg, in any order
-   * @param metricName metric name from request
-   * @return @Function
-   */
-  public static Function fromMetricName(String metricName) {
-    // gets postprocessing, and aggregation function
-    // ex. Metric._rate._avg
-    String[] parts = metricName.split(SUFFIX_SEPARATOR);
-
-    ReadFunction readFunction = ReadFunction.VALUE;
-    PostProcessingFunction ppFunction = null;
-
-    if (parts.length <= 1) {
-      return new Function(readFunction, null);
-    }
-    if (parts.length > 3) {
-      throw new IllegalArgumentException("Invalid number of functions 
specified.");
-    }
-
-    // Parse functions
-    boolean isSuccessful = false; // Best effort
-    for (String part : parts) {
-      if (ReadFunction.isPresent(part)) {
-        readFunction = ReadFunction.getFunction(part);
-        isSuccessful = true;
-      }
-      if (PostProcessingFunction.isPresent(part)) {
-        ppFunction = PostProcessingFunction.getFunction(part);
-        isSuccessful = true;
-      }
-    }
-
-    // Throw exception if parsing failed
-    if (!isSuccessful) {
-      throw new FunctionFormatException("Could not parse provided functions: " 
+
-        "" + Arrays.asList(parts));
-    }
-
-    return new Function(readFunction, ppFunction);
-  }
-
-  public String getSuffix(){
-    return (postProcessingFunction == null)? readFunction.getSuffix() :
-      postProcessingFunction.getSuffix() + readFunction.getSuffix();
-  }
-
-  public ReadFunction getReadFunction() {
-    return readFunction;
-  }
-
-  @Override
-  public String toString() {
-    return "Function{" +
-      "readFunction=" + readFunction +
-      ", postProcessingFunction=" + postProcessingFunction +
-      '}';
-  }
-
-  @Override
-  public boolean equals(Object o) {
-    if (this == o) return true;
-    if (!(o instanceof Function)) return false;
-
-    Function function = (Function) o;
-
-    return postProcessingFunction == function.postProcessingFunction
-      && readFunction == function.readFunction;
-
-  }
-
-  @Override
-  public int hashCode() {
-    int result = readFunction.hashCode();
-    result = 31 * result + (postProcessingFunction != null ?
-      postProcessingFunction.hashCode() : 0);
-    return result;
-  }
-
-  public enum PostProcessingFunction {
-    NONE(""),
-    RATE("._rate"),
-    DIFF("._diff");
-
-    PostProcessingFunction(String suffix){
-      this.suffix = suffix;
-    }
-
-    private String suffix = "";
-
-    public String getSuffix(){
-      return suffix;
-    }
-
-    public static boolean isPresent(String functionName) {
-      try {
-        PostProcessingFunction.valueOf(functionName.toUpperCase());
-      } catch (IllegalArgumentException e) {
-        return false;
-      }
-      return true;
-    }
-
-    public static PostProcessingFunction getFunction(String functionName) 
throws FunctionFormatException {
-      if (functionName == null) {
-        return NONE;
-      }
-
-      try {
-        return PostProcessingFunction.valueOf(functionName.toUpperCase());
-      } catch (IllegalArgumentException e) {
-        throw new FunctionFormatException("Function should be ._rate", e);
-      }
-    }
-  }
-
-  public enum ReadFunction {
-    VALUE(""),
-    AVG("._avg"),
-    MIN("._min"),
-    MAX("._max"),
-    SUM("._sum");
-
-    private final String suffix;
-
-    ReadFunction(String suffix){
-      this.suffix = suffix;
-    }
-
-    public String getSuffix() {
-      return suffix;
-    }
-
-    public static boolean isPresent(String functionName) {
-      try {
-        ReadFunction.valueOf(functionName.toUpperCase());
-      } catch (IllegalArgumentException e) {
-        return false;
-      }
-      return true;
-    }
-
-    public static ReadFunction getFunction(String functionName) throws 
FunctionFormatException {
-      if (functionName == null) {
-        return VALUE;
-      }
-      try {
-        return ReadFunction.valueOf(functionName.toUpperCase());
-      } catch (IllegalArgumentException e) {
-        throw new FunctionFormatException(
-          "Function should be sum, avg, min, max. Got " + functionName, e);
-      }
-    }
-  }
-
-  public static class FunctionFormatException extends IllegalArgumentException 
{
-    public FunctionFormatException(String message) {
-      super(message);
-    }
-
-    public FunctionFormatException(String message, Throwable cause) {
-      super(message, cause);
-    }
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/ambari/blob/42112e28/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineClusterMetric.java
----------------------------------------------------------------------
diff --git 
a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineClusterMetric.java
 
b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineClusterMetric.java
deleted file mode 100644
index 6e793e1..0000000
--- 
a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineClusterMetric.java
+++ /dev/null
@@ -1,97 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators;
-
-public class TimelineClusterMetric {
-  private String metricName;
-  private String appId;
-  private String instanceId;
-  private long timestamp;
-
-  public TimelineClusterMetric(String metricName, String appId, String 
instanceId,
-                        long timestamp) {
-    this.metricName = metricName;
-    this.appId = appId;
-    this.instanceId = instanceId;
-    this.timestamp = timestamp;
-  }
-
-  public String getMetricName() {
-    return metricName;
-  }
-
-  public String getAppId() {
-    return appId;
-  }
-
-  public String getInstanceId() {
-    return instanceId;
-  }
-
-  public long getTimestamp() {
-    return timestamp;
-  }
-
-  @Override
-  public boolean equals(Object o) {
-    if (this == o) return true;
-    if (o == null || getClass() != o.getClass()) return false;
-
-    TimelineClusterMetric that = (TimelineClusterMetric) o;
-
-    if (timestamp != that.timestamp) return false;
-    if (appId != null ? !appId.equals(that.appId) : that.appId != null)
-      return false;
-    if (instanceId != null ? !instanceId.equals(that.instanceId) : 
that.instanceId != null)
-      return false;
-    if (!metricName.equals(that.metricName)) return false;
-
-    return true;
-  }
-
-  public boolean equalsExceptTime(TimelineClusterMetric metric) {
-    if (!metricName.equals(metric.metricName)) return false;
-    if (!appId.equals(metric.appId)) return false;
-    if (instanceId != null ? !instanceId.equals(metric.instanceId) : 
metric.instanceId != null)
-      return false;
-
-    return true;
-  }
-  @Override
-  public int hashCode() {
-    int result = metricName.hashCode();
-    result = 31 * result + (appId != null ? appId.hashCode() : 0);
-    result = 31 * result + (instanceId != null ? instanceId.hashCode() : 0);
-    result = 31 * result + (int) (timestamp ^ (timestamp >>> 32));
-    return result;
-  }
-
-  @Override
-  public String toString() {
-    return "TimelineClusterMetric{" +
-      "metricName='" + metricName + '\'' +
-      ", appId='" + appId + '\'' +
-      ", instanceId='" + instanceId + '\'' +
-      ", timestamp=" + timestamp +
-      '}';
-  }
-
-  public void setTimestamp(long timestamp) {
-    this.timestamp = timestamp;
-  }
-}

http://git-wip-us.apache.org/repos/asf/ambari/blob/42112e28/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricAggregator.java
----------------------------------------------------------------------
diff --git 
a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricAggregator.java
 
b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricAggregator.java
deleted file mode 100644
index 150e3f1..0000000
--- 
a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricAggregator.java
+++ /dev/null
@@ -1,59 +0,0 @@
-package 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators;
-
-import 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.AggregationTaskRunner.AGGREGATOR_NAME;
-
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-public interface TimelineMetricAggregator extends Runnable {
-  /**
-   * Aggregate metric data within the time bounds.
-   *
-   * @param startTime start time millis
-   * @param endTime   end time millis
-   * @return success
-   */
-  boolean doWork(long startTime, long endTime);
-
-  /**
-   * Is aggregator is disabled by configuration.
-   *
-   * @return true/false
-   */
-  boolean isDisabled();
-
-  /**
-   * Return aggregator Interval
-   *
-   * @return Interval in Millis
-   */
-  Long getSleepIntervalMillis();
-
-  /**
-   * Get aggregator name
-   * @return @AGGREGATOR_NAME
-   */
-  AGGREGATOR_NAME getName();
-
-  /**
-   * Known aggregator types
-   */
-  enum AGGREGATOR_TYPE {
-    CLUSTER,
-    HOST
-  }
-}

http://git-wip-us.apache.org/repos/asf/ambari/blob/42112e28/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricAggregatorFactory.java
----------------------------------------------------------------------
diff --git 
a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricAggregatorFactory.java
 
b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricAggregatorFactory.java
deleted file mode 100644
index 3728d19..0000000
--- 
a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricAggregatorFactory.java
+++ /dev/null
@@ -1,528 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators;
-
-import static java.util.concurrent.TimeUnit.SECONDS;
-import static 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.CLUSTER_AGGREGATOR_DAILY_CHECKPOINT_CUTOFF_MULTIPLIER;
-import static 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.CLUSTER_AGGREGATOR_DAILY_DISABLED;
-import static 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.CLUSTER_AGGREGATOR_DAILY_SLEEP_INTERVAL;
-import static 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.CLUSTER_AGGREGATOR_HOUR_CHECKPOINT_CUTOFF_MULTIPLIER;
-import static 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.CLUSTER_AGGREGATOR_HOUR_DISABLED;
-import static 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.CLUSTER_AGGREGATOR_HOUR_SLEEP_INTERVAL;
-import static 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.CLUSTER_AGGREGATOR_MINUTE_CHECKPOINT_CUTOFF_MULTIPLIER;
-import static 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.CLUSTER_AGGREGATOR_MINUTE_DISABLED;
-import static 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.CLUSTER_AGGREGATOR_MINUTE_SLEEP_INTERVAL;
-import static 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.CLUSTER_AGGREGATOR_SECOND_CHECKPOINT_CUTOFF_MULTIPLIER;
-import static 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.CLUSTER_AGGREGATOR_SECOND_DISABLED;
-import static 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.CLUSTER_AGGREGATOR_SECOND_SLEEP_INTERVAL;
-import static 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.CLUSTER_AGGREGATOR_TIMESLICE_INTERVAL;
-import static 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.DEFAULT_CHECKPOINT_LOCATION;
-import static 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.HOST_AGGREGATOR_DAILY_CHECKPOINT_CUTOFF_MULTIPLIER;
-import static 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.HOST_AGGREGATOR_DAILY_DISABLED;
-import static 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.HOST_AGGREGATOR_DAILY_SLEEP_INTERVAL;
-import static 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.HOST_AGGREGATOR_HOUR_CHECKPOINT_CUTOFF_MULTIPLIER;
-import static 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.HOST_AGGREGATOR_HOUR_DISABLED;
-import static 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.HOST_AGGREGATOR_HOUR_SLEEP_INTERVAL;
-import static 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.HOST_AGGREGATOR_MINUTE_CHECKPOINT_CUTOFF_MULTIPLIER;
-import static 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.HOST_AGGREGATOR_MINUTE_DISABLED;
-import static 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.HOST_AGGREGATOR_MINUTE_SLEEP_INTERVAL;
-import static 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.TIMELINE_METRICS_AGGREGATOR_CHECKPOINT_DIR;
-import static 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.USE_GROUPBY_AGGREGATOR_QUERIES;
-import static 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.AggregationTaskRunner.AGGREGATOR_NAME.METRIC_AGGREGATE_DAILY;
-import static 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.AggregationTaskRunner.AGGREGATOR_NAME.METRIC_AGGREGATE_HOURLY;
-import static 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.AggregationTaskRunner.AGGREGATOR_NAME.METRIC_AGGREGATE_MINUTE;
-import static 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.AggregationTaskRunner.AGGREGATOR_NAME.METRIC_AGGREGATE_SECOND;
-import static 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.AggregationTaskRunner.AGGREGATOR_NAME.METRIC_RECORD_DAILY;
-import static 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.AggregationTaskRunner.AGGREGATOR_NAME.METRIC_RECORD_HOURLY;
-import static 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.AggregationTaskRunner.AGGREGATOR_NAME.METRIC_RECORD_MINUTE;
-import static 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.METRICS_AGGREGATE_DAILY_TABLE_NAME;
-import static 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.METRICS_AGGREGATE_HOURLY_TABLE_NAME;
-import static 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.METRICS_AGGREGATE_MINUTE_TABLE_NAME;
-import static 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.METRICS_CLUSTER_AGGREGATE_DAILY_TABLE_NAME;
-import static 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.METRICS_CLUSTER_AGGREGATE_HOURLY_TABLE_NAME;
-import static 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.METRICS_CLUSTER_AGGREGATE_MINUTE_TABLE_NAME;
-import static 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.METRICS_CLUSTER_AGGREGATE_TABLE_NAME;
-import static 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.METRICS_RECORD_TABLE_NAME;
-
-import java.util.concurrent.ConcurrentHashMap;
-
-import org.apache.commons.io.FilenameUtils;
-import org.apache.hadoop.conf.Configuration;
-import 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixHBaseAccessor;
-import 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration;
-import 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricDistributedCache;
-import 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.MetricCollectorHAController;
-import 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.discovery.TimelineMetricMetadataManager;
-
-/**
- * Factory class that knows how to create a aggregator instance using
- * TimelineMetricConfiguration
- */
-public class TimelineMetricAggregatorFactory {
-  private static final String HOST_AGGREGATE_MINUTE_CHECKPOINT_FILE =
-    "timeline-metrics-host-aggregator-checkpoint";
-  private static final String HOST_AGGREGATE_HOURLY_CHECKPOINT_FILE =
-    "timeline-metrics-host-aggregator-hourly-checkpoint";
-  private static final String HOST_AGGREGATE_DAILY_CHECKPOINT_FILE =
-    "timeline-metrics-host-aggregator-daily-checkpoint";
-  private static final String CLUSTER_AGGREGATOR_CHECKPOINT_FILE =
-    "timeline-metrics-cluster-aggregator-checkpoint";
-  private static final String CLUSTER_AGGREGATOR_MINUTE_CHECKPOINT_FILE =
-    "timeline-metrics-cluster-aggregator-minute-checkpoint";
-  private static final String CLUSTER_AGGREGATOR_HOURLY_CHECKPOINT_FILE =
-    "timeline-metrics-cluster-aggregator-hourly-checkpoint";
-  private static final String CLUSTER_AGGREGATOR_DAILY_CHECKPOINT_FILE =
-    "timeline-metrics-cluster-aggregator-daily-checkpoint";
-
-  private static boolean useGroupByAggregator(Configuration metricsConf) {
-    return 
Boolean.parseBoolean(metricsConf.get(USE_GROUPBY_AGGREGATOR_QUERIES, "true"));
-  }
-
-  /**
-   * Minute based aggregation for hosts.
-   * Interval : 5 mins
-   */
-  public static TimelineMetricAggregator createTimelineMetricAggregatorMinute
-    (PhoenixHBaseAccessor hBaseAccessor, Configuration metricsConf,
-     TimelineMetricMetadataManager metadataManager,
-     MetricCollectorHAController haController) {
-
-    String checkpointDir = metricsConf.get(
-      TIMELINE_METRICS_AGGREGATOR_CHECKPOINT_DIR, DEFAULT_CHECKPOINT_LOCATION);
-    String checkpointLocation = FilenameUtils.concat(checkpointDir,
-      HOST_AGGREGATE_MINUTE_CHECKPOINT_FILE);
-    long sleepIntervalMillis = SECONDS.toMillis(metricsConf.getLong
-      (HOST_AGGREGATOR_MINUTE_SLEEP_INTERVAL, 300l));  // 5 mins
-
-    int checkpointCutOffMultiplier = metricsConf.getInt
-      (HOST_AGGREGATOR_MINUTE_CHECKPOINT_CUTOFF_MULTIPLIER, 3);
-    String hostAggregatorDisabledParam = HOST_AGGREGATOR_MINUTE_DISABLED;
-
-    String inputTableName = METRICS_RECORD_TABLE_NAME;
-    String outputTableName = METRICS_AGGREGATE_MINUTE_TABLE_NAME;
-
-    if (useGroupByAggregator(metricsConf)) {
-      return new 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.v2.TimelineMetricHostAggregator(
-        METRIC_RECORD_MINUTE,
-        hBaseAccessor, metricsConf,
-        checkpointLocation,
-        sleepIntervalMillis,
-        checkpointCutOffMultiplier,
-        hostAggregatorDisabledParam,
-        inputTableName,
-        outputTableName,
-        120000l,
-        haController
-      );
-    }
-
-    return new TimelineMetricHostAggregator(
-      METRIC_RECORD_MINUTE,
-      metadataManager,
-      hBaseAccessor, metricsConf,
-      checkpointLocation,
-      sleepIntervalMillis,
-      checkpointCutOffMultiplier,
-      hostAggregatorDisabledParam,
-      inputTableName,
-      outputTableName,
-      120000l,
-      haController);
-  }
-
-  /**
-   * Hourly aggregation for hosts.
-   * Interval : 1 hour
-   */
-  public static TimelineMetricAggregator createTimelineMetricAggregatorHourly
-    (PhoenixHBaseAccessor hBaseAccessor, Configuration metricsConf,
-     TimelineMetricMetadataManager metadataManager,
-     MetricCollectorHAController haController) {
-
-    String checkpointDir = metricsConf.get(
-      TIMELINE_METRICS_AGGREGATOR_CHECKPOINT_DIR, DEFAULT_CHECKPOINT_LOCATION);
-    String checkpointLocation = FilenameUtils.concat(checkpointDir,
-      HOST_AGGREGATE_HOURLY_CHECKPOINT_FILE);
-    long sleepIntervalMillis = SECONDS.toMillis(metricsConf.getLong
-      (HOST_AGGREGATOR_HOUR_SLEEP_INTERVAL, 3600l));
-
-    int checkpointCutOffMultiplier = metricsConf.getInt
-      (HOST_AGGREGATOR_HOUR_CHECKPOINT_CUTOFF_MULTIPLIER, 2);
-    String hostAggregatorDisabledParam = HOST_AGGREGATOR_HOUR_DISABLED;
-
-    String inputTableName = METRICS_AGGREGATE_MINUTE_TABLE_NAME;
-    String outputTableName = METRICS_AGGREGATE_HOURLY_TABLE_NAME;
-
-    if (useGroupByAggregator(metricsConf)) {
-      return new 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.v2.TimelineMetricHostAggregator(
-        METRIC_RECORD_HOURLY,
-        hBaseAccessor, metricsConf,
-        checkpointLocation,
-        sleepIntervalMillis,
-        checkpointCutOffMultiplier,
-        hostAggregatorDisabledParam,
-        inputTableName,
-        outputTableName,
-        3600000l,
-        haController
-      );
-    }
-
-    return new TimelineMetricHostAggregator(
-      METRIC_RECORD_HOURLY,
-      metadataManager,
-      hBaseAccessor, metricsConf,
-      checkpointLocation,
-      sleepIntervalMillis,
-      checkpointCutOffMultiplier,
-      hostAggregatorDisabledParam,
-      inputTableName,
-      outputTableName,
-      3600000l,
-      haController);
-  }
-
-  /**
-   * Daily aggregation for hosts.
-   * Interval : 1 day
-   */
-  public static TimelineMetricAggregator createTimelineMetricAggregatorDaily
-    (PhoenixHBaseAccessor hBaseAccessor, Configuration metricsConf,
-     TimelineMetricMetadataManager metadataManager,
-     MetricCollectorHAController haController) {
-
-    String checkpointDir = metricsConf.get(
-      TIMELINE_METRICS_AGGREGATOR_CHECKPOINT_DIR, DEFAULT_CHECKPOINT_LOCATION);
-    String checkpointLocation = FilenameUtils.concat(checkpointDir,
-      HOST_AGGREGATE_DAILY_CHECKPOINT_FILE);
-    long sleepIntervalMillis = SECONDS.toMillis(metricsConf.getLong
-      (HOST_AGGREGATOR_DAILY_SLEEP_INTERVAL, 86400l));
-
-    int checkpointCutOffMultiplier = metricsConf.getInt
-      (HOST_AGGREGATOR_DAILY_CHECKPOINT_CUTOFF_MULTIPLIER, 1);
-    String hostAggregatorDisabledParam = HOST_AGGREGATOR_DAILY_DISABLED;
-
-    String inputTableName = METRICS_AGGREGATE_HOURLY_TABLE_NAME;
-    String outputTableName = METRICS_AGGREGATE_DAILY_TABLE_NAME;
-
-    if (useGroupByAggregator(metricsConf)) {
-      return new 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.v2.TimelineMetricHostAggregator(
-        METRIC_RECORD_DAILY,
-        hBaseAccessor, metricsConf,
-        checkpointLocation,
-        sleepIntervalMillis,
-        checkpointCutOffMultiplier,
-        hostAggregatorDisabledParam,
-        inputTableName,
-        outputTableName,
-        3600000l,
-        haController
-      );
-    }
-
-    return new TimelineMetricHostAggregator(
-      METRIC_RECORD_DAILY,
-      metadataManager,
-      hBaseAccessor, metricsConf,
-      checkpointLocation,
-      sleepIntervalMillis,
-      checkpointCutOffMultiplier,
-      hostAggregatorDisabledParam,
-      inputTableName,
-      outputTableName,
-      3600000l,
-      haController);
-  }
-
-  /**
-   * Second aggregation for cluster.
-   * Interval : 2 mins
-   * Timeslice : 30 sec
-   */
-  public static TimelineMetricAggregator createTimelineClusterAggregatorSecond(
-    PhoenixHBaseAccessor hBaseAccessor, Configuration metricsConf,
-    TimelineMetricMetadataManager metadataManager,
-    MetricCollectorHAController haController,
-    TimelineMetricDistributedCache distributedCache) {
-
-    String checkpointDir = metricsConf.get(
-      TIMELINE_METRICS_AGGREGATOR_CHECKPOINT_DIR, DEFAULT_CHECKPOINT_LOCATION);
-
-    String checkpointLocation = FilenameUtils.concat(checkpointDir,
-      CLUSTER_AGGREGATOR_CHECKPOINT_FILE);
-
-    long sleepIntervalMillis = SECONDS.toMillis(metricsConf.getLong
-      (CLUSTER_AGGREGATOR_SECOND_SLEEP_INTERVAL, 120l));
-
-    long timeSliceIntervalMillis = SECONDS.toMillis(metricsConf.getInt
-      (CLUSTER_AGGREGATOR_TIMESLICE_INTERVAL, 30));
-
-    int checkpointCutOffMultiplier =
-      
metricsConf.getInt(CLUSTER_AGGREGATOR_SECOND_CHECKPOINT_CUTOFF_MULTIPLIER, 2);
-
-    String outputTableName = METRICS_CLUSTER_AGGREGATE_TABLE_NAME;
-    String aggregatorDisabledParam = CLUSTER_AGGREGATOR_SECOND_DISABLED;
-
-    // Second based aggregation have added responsibility of time slicing
-    if 
(TimelineMetricConfiguration.getInstance().isCollectorInMemoryAggregationEnabled())
 {
-      return new TimelineMetricClusterAggregatorSecondWithCacheSource(
-        METRIC_AGGREGATE_SECOND,
-        metadataManager,
-        hBaseAccessor, metricsConf,
-        checkpointLocation,
-        sleepIntervalMillis,
-        checkpointCutOffMultiplier,
-        aggregatorDisabledParam,
-        null,
-        outputTableName,
-        120000l,
-        timeSliceIntervalMillis,
-        haController,
-        distributedCache
-      );
-    }
-
-    String inputTableName = METRICS_RECORD_TABLE_NAME;
-    return new TimelineMetricClusterAggregatorSecond(
-      METRIC_AGGREGATE_SECOND,
-      metadataManager,
-      hBaseAccessor, metricsConf,
-      checkpointLocation,
-      sleepIntervalMillis,
-      checkpointCutOffMultiplier,
-      aggregatorDisabledParam,
-      inputTableName,
-      outputTableName,
-      120000l,
-      timeSliceIntervalMillis,
-      haController
-    );
-  }
-
-  /**
-   * Minute aggregation for cluster.
-   * Interval : 5 mins
-   */
-  public static TimelineMetricAggregator createTimelineClusterAggregatorMinute(
-    PhoenixHBaseAccessor hBaseAccessor, Configuration metricsConf,
-    TimelineMetricMetadataManager metadataManager,
-    MetricCollectorHAController haController) {
-
-    String checkpointDir = metricsConf.get(
-      TIMELINE_METRICS_AGGREGATOR_CHECKPOINT_DIR, DEFAULT_CHECKPOINT_LOCATION);
-
-    String checkpointLocation = FilenameUtils.concat(checkpointDir,
-      CLUSTER_AGGREGATOR_MINUTE_CHECKPOINT_FILE);
-
-    long sleepIntervalMillis = SECONDS.toMillis(metricsConf.getLong
-      (CLUSTER_AGGREGATOR_MINUTE_SLEEP_INTERVAL, 300l));
-
-    int checkpointCutOffMultiplier = metricsConf.getInt
-      (CLUSTER_AGGREGATOR_MINUTE_CHECKPOINT_CUTOFF_MULTIPLIER, 2);
-
-    String inputTableName = METRICS_CLUSTER_AGGREGATE_TABLE_NAME;
-    String outputTableName = METRICS_CLUSTER_AGGREGATE_MINUTE_TABLE_NAME;
-    String aggregatorDisabledParam = CLUSTER_AGGREGATOR_MINUTE_DISABLED;
-
-    if (useGroupByAggregator(metricsConf)) {
-      return new 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.v2.TimelineMetricClusterAggregator(
-        METRIC_AGGREGATE_MINUTE,
-        hBaseAccessor, metricsConf,
-        checkpointLocation,
-        sleepIntervalMillis,
-        checkpointCutOffMultiplier,
-        aggregatorDisabledParam,
-        inputTableName,
-        outputTableName,
-        120000l,
-        haController
-      );
-    }
-
-    return new TimelineMetricClusterAggregator(
-      METRIC_AGGREGATE_MINUTE,
-      metadataManager,
-      hBaseAccessor, metricsConf,
-      checkpointLocation,
-      sleepIntervalMillis,
-      checkpointCutOffMultiplier,
-      aggregatorDisabledParam,
-      inputTableName,
-      outputTableName,
-      120000l,
-      haController
-    );
-  }
-
-  /**
-   * Hourly aggregation for cluster.
-   * Interval : 1 hour
-   */
-  public static TimelineMetricAggregator createTimelineClusterAggregatorHourly(
-    PhoenixHBaseAccessor hBaseAccessor, Configuration metricsConf,
-    TimelineMetricMetadataManager metadataManager,
-    MetricCollectorHAController haController) {
-
-    String checkpointDir = metricsConf.get(
-      TIMELINE_METRICS_AGGREGATOR_CHECKPOINT_DIR, DEFAULT_CHECKPOINT_LOCATION);
-
-    String checkpointLocation = FilenameUtils.concat(checkpointDir,
-      CLUSTER_AGGREGATOR_HOURLY_CHECKPOINT_FILE);
-
-    long sleepIntervalMillis = SECONDS.toMillis(metricsConf.getLong
-      (CLUSTER_AGGREGATOR_HOUR_SLEEP_INTERVAL, 3600l));
-
-    int checkpointCutOffMultiplier = metricsConf.getInt
-      (CLUSTER_AGGREGATOR_HOUR_CHECKPOINT_CUTOFF_MULTIPLIER, 2);
-
-    String inputTableName = METRICS_CLUSTER_AGGREGATE_TABLE_NAME;
-    String outputTableName = METRICS_CLUSTER_AGGREGATE_HOURLY_TABLE_NAME;
-    String aggregatorDisabledParam = CLUSTER_AGGREGATOR_HOUR_DISABLED;
-
-    if (useGroupByAggregator(metricsConf)) {
-      return new 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.v2.TimelineMetricClusterAggregator(
-        METRIC_AGGREGATE_HOURLY,
-        hBaseAccessor, metricsConf,
-        checkpointLocation,
-        sleepIntervalMillis,
-        checkpointCutOffMultiplier,
-        aggregatorDisabledParam,
-        inputTableName,
-        outputTableName,
-        120000l,
-        haController
-      );
-    }
-
-    return new TimelineMetricClusterAggregator(
-      METRIC_AGGREGATE_HOURLY,
-      metadataManager,
-      hBaseAccessor, metricsConf,
-      checkpointLocation,
-      sleepIntervalMillis,
-      checkpointCutOffMultiplier,
-      aggregatorDisabledParam,
-      inputTableName,
-      outputTableName,
-      120000l,
-      haController
-    );
-  }
-
-  /**
-   * Daily aggregation for cluster.
-   * Interval : 1 day
-   */
-  public static TimelineMetricAggregator createTimelineClusterAggregatorDaily(
-    PhoenixHBaseAccessor hBaseAccessor, Configuration metricsConf,
-    TimelineMetricMetadataManager metadataManager,
-    MetricCollectorHAController haController) {
-
-    String checkpointDir = metricsConf.get(
-      TIMELINE_METRICS_AGGREGATOR_CHECKPOINT_DIR, DEFAULT_CHECKPOINT_LOCATION);
-
-    String checkpointLocation = FilenameUtils.concat(checkpointDir,
-      CLUSTER_AGGREGATOR_DAILY_CHECKPOINT_FILE);
-
-    long sleepIntervalMillis = SECONDS.toMillis(metricsConf.getLong
-      (CLUSTER_AGGREGATOR_DAILY_SLEEP_INTERVAL, 86400l));
-
-    int checkpointCutOffMultiplier = metricsConf.getInt
-      (CLUSTER_AGGREGATOR_DAILY_CHECKPOINT_CUTOFF_MULTIPLIER, 1);
-
-    String inputTableName = METRICS_CLUSTER_AGGREGATE_HOURLY_TABLE_NAME;
-    String outputTableName = METRICS_CLUSTER_AGGREGATE_DAILY_TABLE_NAME;
-    String aggregatorDisabledParam = CLUSTER_AGGREGATOR_DAILY_DISABLED;
-
-    if (useGroupByAggregator(metricsConf)) {
-      return new 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.v2.TimelineMetricClusterAggregator(
-        METRIC_AGGREGATE_DAILY,
-        hBaseAccessor, metricsConf,
-        checkpointLocation,
-        sleepIntervalMillis,
-        checkpointCutOffMultiplier,
-        aggregatorDisabledParam,
-        inputTableName,
-        outputTableName,
-        120000l,
-        haController
-      );
-    }
-
-    return new TimelineMetricClusterAggregator(
-      METRIC_AGGREGATE_DAILY,
-      metadataManager,
-      hBaseAccessor, metricsConf,
-      checkpointLocation,
-      sleepIntervalMillis,
-      checkpointCutOffMultiplier,
-      aggregatorDisabledParam,
-      inputTableName,
-      outputTableName,
-      120000l,
-      haController
-    );
-  }
-
-  public static TimelineMetricAggregator 
createFilteringTimelineMetricAggregatorMinute(PhoenixHBaseAccessor 
hBaseAccessor, Configuration metricsConf, TimelineMetricMetadataManager 
metricMetadataManager, MetricCollectorHAController haController, 
ConcurrentHashMap<String, Long> postedAggregatedMap) {
-    String checkpointDir = metricsConf.get(
-      TIMELINE_METRICS_AGGREGATOR_CHECKPOINT_DIR, DEFAULT_CHECKPOINT_LOCATION);
-    String checkpointLocation = FilenameUtils.concat(checkpointDir,
-      HOST_AGGREGATE_MINUTE_CHECKPOINT_FILE);
-    long sleepIntervalMillis = SECONDS.toMillis(metricsConf.getLong
-      (HOST_AGGREGATOR_MINUTE_SLEEP_INTERVAL, 300l));  // 5 mins
-
-    int checkpointCutOffMultiplier = metricsConf.getInt
-      (HOST_AGGREGATOR_MINUTE_CHECKPOINT_CUTOFF_MULTIPLIER, 3);
-    String hostAggregatorDisabledParam = HOST_AGGREGATOR_MINUTE_DISABLED;
-
-    String inputTableName = METRICS_RECORD_TABLE_NAME;
-    String outputTableName = METRICS_AGGREGATE_MINUTE_TABLE_NAME;
-
-    if (useGroupByAggregator(metricsConf)) {
-      return new 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.v2.TimelineMetricFilteringHostAggregator(
-        METRIC_RECORD_MINUTE,
-        metricMetadataManager,
-        hBaseAccessor, metricsConf,
-        checkpointLocation,
-        sleepIntervalMillis,
-        checkpointCutOffMultiplier,
-        hostAggregatorDisabledParam,
-        inputTableName,
-        outputTableName,
-        120000l,
-        haController,
-        postedAggregatedMap
-      );
-    }
-
-    return new TimelineMetricFilteringHostAggregator(
-      METRIC_RECORD_MINUTE,
-      metricMetadataManager,
-      hBaseAccessor, metricsConf,
-      checkpointLocation,
-      sleepIntervalMillis,
-      checkpointCutOffMultiplier,
-      hostAggregatorDisabledParam,
-      inputTableName,
-      outputTableName,
-      120000l,
-      haController,
-      postedAggregatedMap);
-  }
-}

http://git-wip-us.apache.org/repos/asf/ambari/blob/42112e28/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricAppAggregator.java
----------------------------------------------------------------------
diff --git 
a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricAppAggregator.java
 
b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricAppAggregator.java
deleted file mode 100644
index b06b147..0000000
--- 
a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricAppAggregator.java
+++ /dev/null
@@ -1,189 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators;
-
-import static 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.CLUSTER_AGGREGATOR_APP_IDS;
-import static 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.HOST_APP_ID;
-
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-
-import org.apache.commons.lang.StringUtils;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.metrics2.sink.timeline.MetricClusterAggregate;
-import org.apache.hadoop.metrics2.sink.timeline.TimelineMetricMetadata;
-import 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricsFilter;
-import 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.discovery.TimelineMetricHostMetadata;
-import 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.discovery.TimelineMetricMetadataKey;
-import 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.discovery.TimelineMetricMetadataManager;
-
-/**
- * Aggregator responsible for providing app level host aggregates. This task
- * is accomplished without doing a round trip to storage, rather
- * TimelineMetricClusterAggregators are responsible for lifecycle of
- * @TimelineMetricAppAggregator and provide the raw data to aggregate.
- */
-public class TimelineMetricAppAggregator {
-  private static final Log LOG = 
LogFactory.getLog(TimelineMetricAppAggregator.class);
-  // Lookup to check candidacy of an app
-  private final List<String> appIdsToAggregate;
-  private final Map<String, TimelineMetricHostMetadata> hostMetadata;
-  Map<TimelineClusterMetric, MetricClusterAggregate> aggregateClusterMetrics = 
new HashMap<>();
-  TimelineMetricMetadataManager metadataManagerInstance;
-
-  public TimelineMetricAppAggregator(TimelineMetricMetadataManager 
metadataManager,
-                                     Configuration metricsConf) {
-    appIdsToAggregate = getAppIdsForHostAggregation(metricsConf);
-    hostMetadata = metadataManager.getHostedAppsCache();
-    metadataManagerInstance = metadataManager;
-    LOG.info("AppIds configured for aggregation: " + appIdsToAggregate);
-  }
-
-  /**
-   * Lifecycle method to initialize aggregation cycle.
-   */
-  public void init() {
-    LOG.debug("Initializing aggregation cycle.");
-    aggregateClusterMetrics = new HashMap<>();
-  }
-
-  /**
-   * Lifecycle method to indicate end of aggregation cycle.
-   */
-  public void cleanup() {
-    LOG.debug("Cleanup aggregated data.");
-    aggregateClusterMetrics = null;
-  }
-
-  /**
-   * Calculate aggregates if the clusterMetric is a Host metric for recorded
-   * apps that are housed by this host.
-   *
-   * @param clusterMetric @TimelineClusterMetric Host / App metric
-   * @param hostname This is the hostname from which this clusterMetric 
originated.
-   * @param metricValue The metric value for this metric.
-   */
-  public void processTimelineClusterMetric(TimelineClusterMetric clusterMetric,
-                                           String hostname, Double 
metricValue) {
-
-    String appId = clusterMetric.getAppId();
-    if (appId == null) {
-      return; // No real use case except tests
-    }
-
-    // If metric is a host metric and host has apps on it
-    if (appId.equalsIgnoreCase(HOST_APP_ID)) {
-      // Candidate metric, update app aggregates
-      if (hostMetadata.containsKey(hostname)) {
-        updateAppAggregatesFromHostMetric(clusterMetric, hostname, 
metricValue);
-      }
-    } else {
-      // Build the hostedapps map if not a host metric
-      // Check app candidacy for host aggregation
-      if (appIdsToAggregate.contains(appId)) {
-        TimelineMetricHostMetadata timelineMetricHostMetadata = 
hostMetadata.get(hostname);
-        ConcurrentHashMap<String, String> appIds;
-        if (timelineMetricHostMetadata == null) {
-          appIds = new ConcurrentHashMap<>();
-          hostMetadata.put(hostname, new TimelineMetricHostMetadata(appIds));
-        } else {
-          appIds = timelineMetricHostMetadata.getHostedApps();
-        }
-        if (!appIds.containsKey(appId)) {
-          appIds.put(appId, appId);
-          LOG.info("Adding appId to hosted apps: appId = " +
-            clusterMetric.getAppId() + ", hostname = " + hostname);
-        }
-      }
-    }
-  }
-
-  /**
-   * Build a cluster app metric from a host metric
-   */
-  private void updateAppAggregatesFromHostMetric(TimelineClusterMetric 
clusterMetric,
-                                                 String hostname, Double 
metricValue) {
-
-    if (aggregateClusterMetrics == null) {
-      LOG.error("Aggregation requested without init call.");
-      return;
-    }
-
-    TimelineMetricMetadataKey appKey =  new 
TimelineMetricMetadataKey(clusterMetric.getMetricName(), HOST_APP_ID, 
clusterMetric.getInstanceId());
-    ConcurrentHashMap<String, String> apps = 
hostMetadata.get(hostname).getHostedApps();
-    for (String appId : apps.keySet()) {
-      if (appIdsToAggregate.contains(appId)) {
-
-        appKey.setAppId(appId);
-        TimelineMetricMetadata appMetadata = 
metadataManagerInstance.getMetadataCacheValue(appKey);
-        if (appMetadata == null) {
-          TimelineMetricMetadataKey key = new 
TimelineMetricMetadataKey(clusterMetric.getMetricName(), HOST_APP_ID, 
clusterMetric.getInstanceId());
-          TimelineMetricMetadata hostMetricMetadata = 
metadataManagerInstance.getMetadataCacheValue(key);
-
-          if (hostMetricMetadata != null) {
-            TimelineMetricMetadata timelineMetricMetadata = new 
TimelineMetricMetadata(clusterMetric.getMetricName(),
-              appId, clusterMetric.getInstanceId(), 
hostMetricMetadata.getUnits(), hostMetricMetadata.getType(), 
hostMetricMetadata.getSeriesStartTime(),
-              hostMetricMetadata.isSupportsAggregates(), 
TimelineMetricsFilter.acceptMetric(clusterMetric.getMetricName(), appId));
-            
metadataManagerInstance.putIfModifiedTimelineMetricMetadata(timelineMetricMetadata);
-          }
-        }
-
-        // Add a new cluster aggregate metric if none exists
-        TimelineClusterMetric appTimelineClusterMetric =
-          new TimelineClusterMetric(clusterMetric.getMetricName(),
-            appId,
-            clusterMetric.getInstanceId(),
-            clusterMetric.getTimestamp());
-
-        MetricClusterAggregate clusterAggregate = 
aggregateClusterMetrics.get(appTimelineClusterMetric);
-
-        if (clusterAggregate == null) {
-          clusterAggregate = new MetricClusterAggregate(metricValue, 1, null, 
metricValue, metricValue);
-          aggregateClusterMetrics.put(appTimelineClusterMetric, 
clusterAggregate);
-        } else {
-          clusterAggregate.updateSum(metricValue);
-          clusterAggregate.updateNumberOfHosts(1);
-          clusterAggregate.updateMax(metricValue);
-          clusterAggregate.updateMin(metricValue);
-        }
-      }
-
-    }
-  }
-
-  /**
-   * Return current copy of aggregated data.
-   */
-  public Map<TimelineClusterMetric, MetricClusterAggregate> 
getAggregateClusterMetrics() {
-    return aggregateClusterMetrics;
-  }
-
-  private List<String> getAppIdsForHostAggregation(Configuration metricsConf) {
-    String appIds = metricsConf.get(CLUSTER_AGGREGATOR_APP_IDS);
-    if (!StringUtils.isEmpty(appIds)) {
-      return Arrays.asList(StringUtils.stripAll(appIds.split(",")));
-    }
-    return Collections.emptyList();
-  }
-}

Reply via email to