http://git-wip-us.apache.org/repos/asf/ambari/blob/42112e28/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/AbstractTimelineAggregator.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/AbstractTimelineAggregator.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/AbstractTimelineAggregator.java deleted file mode 100644 index 9cfba6e..0000000 --- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/AbstractTimelineAggregator.java +++ /dev/null @@ -1,495 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators; - -import static java.util.concurrent.TimeUnit.SECONDS; -import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.AGGREGATOR_CHECKPOINT_DELAY; -import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.RESULTSET_FETCH_SIZE; -import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.AggregatorUtils.getRoundedAggregateTimeMillis; -import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.AggregatorUtils.getRoundedCheckPointTimeMillis; -import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.AggregationTaskRunner.ACTUAL_AGGREGATOR_NAMES; - -import java.io.File; -import java.io.IOException; -import java.sql.Connection; -import java.sql.PreparedStatement; -import java.sql.ResultSet; -import java.sql.SQLException; -import java.util.Date; -import java.util.Iterator; -import java.util.List; - -import org.apache.commons.collections.CollectionUtils; -import org.apache.commons.io.FileUtils; -import org.apache.commons.lang.StringUtils; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixHBaseAccessor; -import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.AggregationTaskRunner; -import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.AggregationTaskRunner.AGGREGATOR_NAME; -import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.MetricCollectorHAController; -import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.Condition; -import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.EmptyCondition; -import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * Base class for all runnable aggregators. Provides common functions like - * check pointing and scheduling. - */ -public abstract class AbstractTimelineAggregator implements TimelineMetricAggregator { - protected final PhoenixHBaseAccessor hBaseAccessor; - protected final Logger LOG; - protected final long checkpointDelayMillis; - protected final Integer resultsetFetchSize; - protected Configuration metricsConf; - private String checkpointLocation; - private Long sleepIntervalMillis; - private Integer checkpointCutOffMultiplier; - private String aggregatorDisableParam; - protected String tableName; - protected String outputTableName; - protected Long nativeTimeRangeDelay; - protected AggregationTaskRunner taskRunner; - protected List<String> downsampleMetricPatterns; - protected List<CustomDownSampler> configuredDownSamplers; - - // Explicitly name aggregators for logging needs - private final AGGREGATOR_NAME aggregatorName; - - AbstractTimelineAggregator(AGGREGATOR_NAME aggregatorName, - PhoenixHBaseAccessor hBaseAccessor, - Configuration metricsConf) { - this.aggregatorName = aggregatorName; - this.hBaseAccessor = hBaseAccessor; - this.metricsConf = metricsConf; - this.checkpointDelayMillis = SECONDS.toMillis(metricsConf.getInt(AGGREGATOR_CHECKPOINT_DELAY, 120)); - this.resultsetFetchSize = metricsConf.getInt(RESULTSET_FETCH_SIZE, 2000); - this.LOG = LoggerFactory.getLogger(ACTUAL_AGGREGATOR_NAMES.get(aggregatorName)); - this.configuredDownSamplers = DownSamplerUtils.getDownSamplers(metricsConf); - this.downsampleMetricPatterns = DownSamplerUtils.getDownsampleMetricPatterns(metricsConf); - } - - public AbstractTimelineAggregator(AGGREGATOR_NAME aggregatorName, - PhoenixHBaseAccessor hBaseAccessor, - Configuration metricsConf, - String checkpointLocation, - Long sleepIntervalMillis, - Integer checkpointCutOffMultiplier, - String aggregatorDisableParam, - String tableName, - String outputTableName, - Long nativeTimeRangeDelay, - MetricCollectorHAController haController) { - this(aggregatorName, hBaseAccessor, metricsConf); - this.checkpointLocation = checkpointLocation; - this.sleepIntervalMillis = sleepIntervalMillis; - this.checkpointCutOffMultiplier = checkpointCutOffMultiplier; - this.aggregatorDisableParam = aggregatorDisableParam; - this.tableName = tableName; - this.outputTableName = outputTableName; - this.nativeTimeRangeDelay = nativeTimeRangeDelay; - this.taskRunner = haController != null && haController.isInitialized() ? - haController.getAggregationTaskRunner() : null; - } - - @Override - public void run() { - LOG.info("Started Timeline aggregator thread @ " + new Date()); - Long SLEEP_INTERVAL = getSleepIntervalMillis(); - runOnce(SLEEP_INTERVAL); - } - - /** - * Access relaxed for tests - */ - public void runOnce(Long SLEEP_INTERVAL) { - boolean performAggregationFunction = true; - if (taskRunner != null) { - switch (getAggregatorType()) { - case HOST: - performAggregationFunction = taskRunner.performsHostAggregation(); - break; - case CLUSTER: - performAggregationFunction = taskRunner.performsClusterAggregation(); - } - } - - if (performAggregationFunction) { - long currentTime = System.currentTimeMillis(); - long lastCheckPointTime = readLastCheckpointSavingOnFirstRun(currentTime); - - if (lastCheckPointTime != -1) { - LOG.info("Last check point time: " + lastCheckPointTime + ", lagBy: " - + ((currentTime - lastCheckPointTime) / 1000) - + " seconds."); - - boolean success = doWork(lastCheckPointTime, lastCheckPointTime + SLEEP_INTERVAL); - - if (success) { - try { - saveCheckPoint(lastCheckPointTime + SLEEP_INTERVAL); - } catch (IOException io) { - LOG.warn("Error saving checkpoint, restarting aggregation at " + - "previous checkpoint."); - } - } - } - } else { - LOG.info("Skipping aggregation function not owned by this instance."); - } - } - - private long readLastCheckpointSavingOnFirstRun(long currentTime) { - long lastCheckPointTime = -1; - - try { - lastCheckPointTime = readCheckPoint(); - if (lastCheckPointTime != -1) { - LOG.info("Last Checkpoint read : " + new Date(lastCheckPointTime)); - if (isLastCheckPointTooOld(currentTime, lastCheckPointTime)) { - LOG.warn("Last Checkpoint is too old, discarding last checkpoint. " + - "lastCheckPointTime = " + new Date(lastCheckPointTime)); - lastCheckPointTime = getRoundedAggregateTimeMillis(getSleepIntervalMillis()) - getSleepIntervalMillis(); - LOG.info("Saving checkpoint time. " + new Date((lastCheckPointTime))); - saveCheckPoint(lastCheckPointTime); - - } else { - - if (lastCheckPointTime > 0) { - lastCheckPointTime = getRoundedCheckPointTimeMillis(lastCheckPointTime, getSleepIntervalMillis()); - LOG.info("Rounded off checkpoint : " + new Date(lastCheckPointTime)); - } - - if (isLastCheckPointTooYoung(lastCheckPointTime)) { - LOG.info("Last checkpoint too recent for aggregation. Sleeping for 1 cycle."); - return -1; //Skip Aggregation this time around - } - } - } else { - /* - No checkpoint. Save current rounded checkpoint and sleep for 1 cycle. - */ - LOG.info("No checkpoint found"); - long firstCheckPoint = getRoundedAggregateTimeMillis(getSleepIntervalMillis()); - LOG.info("Saving checkpoint time. " + new Date((firstCheckPoint))); - saveCheckPoint(firstCheckPoint); - } - } catch (IOException io) { - LOG.warn("Unable to write last checkpoint time. Resuming sleep.", io); - } - return lastCheckPointTime; - } - - private boolean isLastCheckPointTooOld(long currentTime, long checkpoint) { - // first checkpoint is saved checkpointDelayMillis in the past, - // so here we also need to take it into account - return checkpoint != -1 && - ((currentTime - checkpoint) > getCheckpointCutOffIntervalMillis()); - } - - private boolean isLastCheckPointTooYoung(long checkpoint) { - return checkpoint != -1 && - ((getRoundedAggregateTimeMillis(getSleepIntervalMillis()) <= checkpoint)); - } - - protected long readCheckPoint() { - if (taskRunner != null) { - return taskRunner.getCheckpointManager().readCheckpoint(aggregatorName); - } - try { - File checkpoint = new File(getCheckpointLocation()); - if (checkpoint.exists()) { - String contents = FileUtils.readFileToString(checkpoint); - if (contents != null && !contents.isEmpty()) { - return Long.parseLong(contents); - } - } - } catch (IOException io) { - LOG.debug("", io); - } - return -1; - } - - protected void saveCheckPoint(long checkpointTime) throws IOException { - if (taskRunner != null) { - boolean success = taskRunner.getCheckpointManager().writeCheckpoint(aggregatorName, checkpointTime); - if (!success) { - LOG.error("Error saving checkpoint with AggregationTaskRunner, " + - "aggregator = " + aggregatorName + "value = " + checkpointTime); - } - } else { - File checkpoint = new File(getCheckpointLocation()); - if (!checkpoint.exists()) { - boolean done = checkpoint.createNewFile(); - if (!done) { - throw new IOException("Could not create checkpoint at location, " + - getCheckpointLocation()); - } - } - FileUtils.writeStringToFile(checkpoint, String.valueOf(checkpointTime)); - } - } - - /** - * Read metrics written during the time interval and save the sum and total - * in the aggregate table. - * - * @param startTime Sample start time - * @param endTime Sample end time - */ - public boolean doWork(long startTime, long endTime) { - LOG.info("Start aggregation cycle @ " + new Date() + ", " + - "startTime = " + new Date(startTime) + ", endTime = " + new Date(endTime)); - - boolean success = true; - Condition condition = prepareMetricQueryCondition(startTime, endTime); - - Connection conn = null; - PreparedStatement stmt = null; - ResultSet rs = null; - - try { - conn = hBaseAccessor.getConnection(); - // FLUME 2. aggregate and ignore the instance - stmt = PhoenixTransactSQL.prepareGetMetricsSqlStmt(conn, condition); - - LOG.debug("Query issued @: " + new Date()); - if (condition.doUpdate()) { - int rows = stmt.executeUpdate(); - conn.commit(); - LOG.info(rows + " row(s) updated in aggregation."); - - //TODO : Fix downsampling after UUID change. - //downsample(conn, startTime, endTime); - } else { - rs = stmt.executeQuery(); - } - LOG.debug("Query returned @: " + new Date()); - - aggregate(rs, startTime, endTime); - - } catch (Exception e) { - LOG.error("Exception during aggregating metrics.", e); - success = false; - } finally { - if (rs != null) { - try { - rs.close(); - } catch (SQLException e) { - // Ignore - } - } - if (stmt != null) { - try { - stmt.close(); - } catch (SQLException e) { - // Ignore - } - } - if (conn != null) { - try { - conn.close(); - } catch (SQLException sql) { - // Ignore - } - } - } - - LOG.info("End aggregation cycle @ " + new Date()); - return success; - } - - protected abstract Condition prepareMetricQueryCondition(long startTime, long endTime); - - protected abstract void aggregate(ResultSet rs, long startTime, long endTime) throws IOException, SQLException; - - protected void downsample(Connection conn, Long startTime, Long endTime) { - - LOG.debug("Checking for downsampling requests."); - if (CollectionUtils.isEmpty(configuredDownSamplers)) { - LOG.debug("No downsamplers configured"); - return; - } - - // Generate UPSERT query prefix. UPSERT part of the query is needed on the Aggregator side. - // SELECT part of the query is provided by the downsampler. - String queryPrefix = PhoenixTransactSQL.DOWNSAMPLE_CLUSTER_METRIC_SQL_UPSERT_PREFIX; - if (outputTableName.contains("RECORD")) { - queryPrefix = PhoenixTransactSQL.DOWNSAMPLE_HOST_METRIC_SQL_UPSERT_PREFIX; - } - queryPrefix = String.format(queryPrefix, outputTableName); - - for (Iterator<CustomDownSampler> iterator = configuredDownSamplers.iterator(); iterator.hasNext();){ - CustomDownSampler downSampler = iterator.next(); - - if (downSampler.validateConfigs()) { - EmptyCondition downSamplingCondition = new EmptyCondition(); - downSamplingCondition.setDoUpdate(true); - List<String> stmts = downSampler.prepareDownSamplingStatement(startTime, endTime, tableName); - for (String stmt : stmts) { - downSamplingCondition.setStatement(queryPrefix + stmt); - runDownSamplerQuery(conn, downSamplingCondition); - } - } else { - LOG.warn("The following downsampler failed config validation : " + downSampler.getClass().getName() + "." + - "Removing it from downsamplers list."); - iterator.remove(); - } - } - - } - - public Long getSleepIntervalMillis() { - return sleepIntervalMillis; - } - - public void setSleepIntervalMillis(Long sleepIntervalMillis) { - this.sleepIntervalMillis = sleepIntervalMillis; - } - - protected Integer getCheckpointCutOffMultiplier() { - return checkpointCutOffMultiplier; - } - - protected Long getCheckpointCutOffIntervalMillis() { - return getCheckpointCutOffMultiplier() * getSleepIntervalMillis(); - } - - public boolean isDisabled() { - return metricsConf.getBoolean(aggregatorDisableParam, false); - } - - protected String getQueryHint(Long startTime) { - StringBuilder sb = new StringBuilder(); - sb.append("/*+ "); - sb.append("NATIVE_TIME_RANGE("); - sb.append(startTime - nativeTimeRangeDelay); - sb.append(") "); - if (hBaseAccessor.isSkipBlockCacheForAggregatorsEnabled()) { - sb.append("NO_CACHE "); - } - sb.append("*/"); - return sb.toString(); - } - - protected String getCheckpointLocation() { - return checkpointLocation; - } - - /** - * Run 1 downsampler query. - * @param conn - * @param condition - */ - private void runDownSamplerQuery(Connection conn, Condition condition) { - - PreparedStatement stmt = null; - ResultSet rs = null; - LOG.debug("Downsampling query : " + condition.getStatement()); - - try { - stmt = PhoenixTransactSQL.prepareGetMetricsSqlStmt(conn, condition); - - LOG.debug("Downsampler Query issued..."); - if (condition.doUpdate()) { - int rows = stmt.executeUpdate(); - conn.commit(); - LOG.info(rows + " row(s) updated in downsampling."); - } else { - rs = stmt.executeQuery(); - } - LOG.debug("Downsampler Query returned ..."); - LOG.info("End Downsampling cycle."); - - } catch (SQLException e) { - LOG.error("Exception during downsampling metrics.", e); - } finally { - if (rs != null) { - try { - rs.close(); - } catch (SQLException e) { - // Ignore - } - } - if (stmt != null) { - try { - stmt.close(); - } catch (SQLException e) { - // Ignore - } - } - if (conn != null) { - try { - conn.close(); - } catch (SQLException sql) { - // Ignore - } - } - } - } - - /** - * Returns the METRIC_NAME NOT LIKE clause if certain metrics or metric patterns are to be skipped - * since they will be downsampled. - * @return - */ - protected String getDownsampledMetricSkipClause() { - - //TODO Fix downsampling for UUID change. - return StringUtils.EMPTY; - -// if (CollectionUtils.isEmpty(this.downsampleMetricPatterns)) { -// return StringUtils.EMPTY; -// } -// -// StringBuilder sb = new StringBuilder(); -// -// for (int i = 0; i < downsampleMetricPatterns.size(); i++) { -// sb.append(" METRIC_NAME"); -// sb.append(" NOT"); -// sb.append(" LIKE "); -// sb.append("'" + downsampleMetricPatterns.get(i) + "'"); -// -// if (i < downsampleMetricPatterns.size() - 1) { -// sb.append(" AND "); -// } -// } -// -// sb.append(" AND "); -// return sb.toString(); - } - - /** - * Get @AGGREGATOR_TYPE based on the output table. - * This is solely used by the HAController to determine which lock to acquire. - */ - public AGGREGATOR_TYPE getAggregatorType() { - if (outputTableName.contains("RECORD")) { - return AGGREGATOR_TYPE.HOST; - } else if (outputTableName.contains("AGGREGATE")) { - return AGGREGATOR_TYPE.CLUSTER; - } - return null; - } - - @Override - public AGGREGATOR_NAME getName() { - return aggregatorName; - } -}
http://git-wip-us.apache.org/repos/asf/ambari/blob/42112e28/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/AggregatorUtils.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/AggregatorUtils.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/AggregatorUtils.java deleted file mode 100644 index b8338fb..0000000 --- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/AggregatorUtils.java +++ /dev/null @@ -1,254 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators; - - -import java.util.ArrayList; -import java.util.Date; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.TreeMap; - -import org.apache.commons.collections.MapUtils; -import org.apache.commons.lang.StringUtils; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.metrics2.sink.timeline.PostProcessingUtil; -import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric; - -/** - * - */ -public class AggregatorUtils { - - private static final Log LOG = LogFactory.getLog(AggregatorUtils.class); - - public static double[] calculateAggregates(Map<Long, Double> metricValues) { - double[] values = new double[4]; - double max = Double.MIN_VALUE; - double min = Double.MAX_VALUE; - double sum = 0.0; - int metricCount = 0; - - if (metricValues != null && !metricValues.isEmpty()) { - for (Double value : metricValues.values()) { - // TODO: Some nulls in data - need to investigate null values from host - if (value != null) { - if (value > max) { - max = value; - } - if (value < min) { - min = value; - } - sum += value; - } - } - metricCount = metricValues.values().size(); - } - // BR: WHY ZERO is a good idea? - values[0] = sum; - values[1] = max != Double.MIN_VALUE ? max : 0.0; - values[2] = min != Double.MAX_VALUE ? min : 0.0; - values[3] = metricCount; - - return values; - } - - public static Map<TimelineClusterMetric, Double> sliceFromTimelineMetric( - TimelineMetric timelineMetric, List<Long[]> timeSlices, boolean interpolationEnabled) { - - if (timelineMetric.getMetricValues().isEmpty()) { - return null; - } - - Map<TimelineClusterMetric, Double> timelineClusterMetricMap = - new HashMap<>(); - - Long prevTimestamp = -1l; - TimelineClusterMetric prevMetric = null; - int count = 0; - double sum = 0.0; - - Map<Long,Double> timeSliceValueMap = new HashMap<>(); - for (Map.Entry<Long, Double> metric : timelineMetric.getMetricValues().entrySet()) { - if (metric.getValue() == null) { - continue; - } - - Long timestamp = getSliceTimeForMetric(timeSlices, Long.parseLong(metric.getKey().toString())); - if (timestamp != -1) { - // Metric is within desired time range - TimelineClusterMetric clusterMetric = new TimelineClusterMetric( - timelineMetric.getMetricName(), - timelineMetric.getAppId(), - timelineMetric.getInstanceId(), - timestamp); - - if (prevTimestamp < 0 || timestamp.equals(prevTimestamp)) { - Double newValue = metric.getValue(); - if (newValue > 0.0) { - sum += newValue; - count++; - } - } else { - double metricValue = (count > 0) ? (sum / count) : 0.0; - timelineClusterMetricMap.put(prevMetric, metricValue); - timeSliceValueMap.put(prevMetric.getTimestamp(), metricValue); - sum = metric.getValue(); - count = sum > 0.0 ? 1 : 0; - } - - prevTimestamp = timestamp; - prevMetric = clusterMetric; - } - } - - if (prevTimestamp > 0) { - double metricValue = (count > 0) ? (sum / count) : 0.0; - timelineClusterMetricMap.put(prevMetric, metricValue); - timeSliceValueMap.put(prevTimestamp, metricValue); - } - - if (interpolationEnabled) { - Map<Long, Double> interpolatedValues = interpolateMissingPeriods(timelineMetric.getMetricValues(), timeSlices, timeSliceValueMap, timelineMetric.getType()); - for (Map.Entry<Long, Double> entry : interpolatedValues.entrySet()) { - TimelineClusterMetric timelineClusterMetric = new TimelineClusterMetric(timelineMetric.getMetricName(), timelineMetric.getAppId(), timelineMetric.getInstanceId(), entry.getKey()); - timelineClusterMetricMap.putIfAbsent(timelineClusterMetric, entry.getValue()); - } - } - - return timelineClusterMetricMap; - } - - private static Map<Long, Double> interpolateMissingPeriods(TreeMap<Long, Double> metricValues, - List<Long[]> timeSlices, - Map<Long, Double> timeSliceValueMap, String type) { - Map<Long, Double> resultClusterMetricMap = new HashMap<>(); - - if (StringUtils.isNotEmpty(type) && "COUNTER".equalsIgnoreCase(type)) { - //For Counter Based metrics, ok to do interpolation and extrapolation - - List<Long> requiredTimestamps = new ArrayList<>(); - for (Long[] timeSlice : timeSlices) { - if (!timeSliceValueMap.containsKey(timeSlice[1])) { - requiredTimestamps.add(timeSlice[1]); - } - } - Map<Long, Double> interpolatedValuesMap = PostProcessingUtil.interpolate(metricValues, requiredTimestamps); - - if (interpolatedValuesMap != null) { - for (Map.Entry<Long, Double> entry : interpolatedValuesMap.entrySet()) { - Double interpolatedValue = entry.getValue(); - - if (interpolatedValue != null) { - resultClusterMetricMap.put( entry.getKey(), interpolatedValue); - } else { - LOG.debug("Cannot compute interpolated value, hence skipping."); - } - } - } - } else { - //For other metrics, ok to do only interpolation - - Double defaultNextSeenValue = null; - if (MapUtils.isEmpty(timeSliceValueMap) && MapUtils.isNotEmpty(metricValues)) { - //If no value was found within the start_time based slices, but the metric has value in the server_time range, - // use that. - - Map.Entry<Long,Double> firstEntry = metricValues.firstEntry(); - defaultNextSeenValue = firstEntry.getValue(); - LOG.debug("Found a data point outside timeslice range: " + new Date(firstEntry.getKey()) + ": " + defaultNextSeenValue); - } - - for (int sliceNum = 0; sliceNum < timeSlices.size(); sliceNum++) { - Long[] timeSlice = timeSlices.get(sliceNum); - - if (!timeSliceValueMap.containsKey(timeSlice[1])) { - LOG.debug("Found an empty slice : " + new Date(timeSlice[0]) + ", " + new Date(timeSlice[1])); - - Double lastSeenValue = null; - int index = sliceNum - 1; - Long[] prevTimeSlice = null; - while (lastSeenValue == null && index >= 0) { - prevTimeSlice = timeSlices.get(index--); - lastSeenValue = timeSliceValueMap.get(prevTimeSlice[1]); - } - - Double nextSeenValue = null; - index = sliceNum + 1; - Long[] nextTimeSlice = null; - while (nextSeenValue == null && index < timeSlices.size()) { - nextTimeSlice = timeSlices.get(index++); - nextSeenValue = timeSliceValueMap.get(nextTimeSlice[1]); - } - - if (nextSeenValue == null) { - nextSeenValue = defaultNextSeenValue; - } - - Double interpolatedValue = PostProcessingUtil.interpolate(timeSlice[1], - (prevTimeSlice != null ? prevTimeSlice[1] : null), lastSeenValue, - (nextTimeSlice != null ? nextTimeSlice[1] : null), nextSeenValue); - - if (interpolatedValue != null) { - LOG.debug("Interpolated value : " + interpolatedValue); - resultClusterMetricMap.put(timeSlice[1], interpolatedValue); - } else { - LOG.debug("Cannot compute interpolated value, hence skipping."); - } - } - } - } - return resultClusterMetricMap; - } - - /** - * Return end of the time slice into which the metric fits. - */ - public static Long getSliceTimeForMetric(List<Long[]> timeSlices, Long timestamp) { - for (Long[] timeSlice : timeSlices) { - if (timestamp >= timeSlice[0] && timestamp < timeSlice[1]) { - return timeSlice[1]; - } - } - return -1l; - } - - /** - * Return time slices to normalize the timeseries data. - */ - public static List<Long[]> getTimeSlices(long startTime, long endTime, long timeSliceIntervalMillis) { - List<Long[]> timeSlices = new ArrayList<Long[]>(); - long sliceStartTime = startTime; - while (sliceStartTime < endTime) { - timeSlices.add(new Long[] { sliceStartTime, sliceStartTime + timeSliceIntervalMillis }); - sliceStartTime += timeSliceIntervalMillis; - } - return timeSlices; - } - - public static long getRoundedCheckPointTimeMillis(long referenceTime, long aggregatorPeriod) { - return referenceTime - (referenceTime % aggregatorPeriod); - } - - public static long getRoundedAggregateTimeMillis(long aggregatorPeriod) { - long currentTime = System.currentTimeMillis(); - return currentTime - (currentTime % aggregatorPeriod); - } -} http://git-wip-us.apache.org/repos/asf/ambari/blob/42112e28/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/CustomDownSampler.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/CustomDownSampler.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/CustomDownSampler.java deleted file mode 100644 index cfd82dd..0000000 --- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/CustomDownSampler.java +++ /dev/null @@ -1,44 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators; - -import java.util.List; - -/** - * Interface to add a custom downsampler. - * Each configured downsampler will be executed during an aggregation cycle. - */ -public interface CustomDownSampler { - - /** - * Gatekeeper to check the configs. If this fails, the downsampling is not done. - * @return - */ - public boolean validateConfigs(); - - /** - * Return the set of statements that needs to be executed for the downsampling. - * @param startTime - * @param endTime - * @param tableName - * @return - */ - public List<String> prepareDownSamplingStatement(Long startTime, Long endTime, String tableName); - -} http://git-wip-us.apache.org/repos/asf/ambari/blob/42112e28/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/DownSamplerUtils.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/DownSamplerUtils.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/DownSamplerUtils.java deleted file mode 100644 index 23b1cb3..0000000 --- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/DownSamplerUtils.java +++ /dev/null @@ -1,117 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators; - -import java.util.ArrayList; -import java.util.List; -import java.util.Map; -import java.util.Set; - -import org.apache.commons.lang.StringUtils; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.conf.Configuration; - -/** - * DownSampler utility class. Responsible for fetching downsampler configs from Metrics config, and determine if - * any downsamplers are configured. - */ - -public class DownSamplerUtils { - - public static final String downSamplerConfigPrefix = "timeline.metrics.downsampler."; - public static final String downSamplerMetricPatternsConfig = "metric.patterns"; - public static final String topNDownSampler = "topn"; - private static final Log LOG = LogFactory.getLog(DownSamplerUtils.class); - - - - /** - * Get the list of metrics that are requested to be downsampled. - * @param configuration - * @return List of metric patterns/names that are to be downsampled. - */ - public static List<String> getDownsampleMetricPatterns(Configuration configuration) { - Map<String, String> conf = configuration.getValByRegex(downSamplerConfigPrefix + "*"); - List<String> metricPatterns = new ArrayList<>(); - Set<String> keys = conf.keySet(); - for (String key : keys) { - if (key.endsWith(downSamplerMetricPatternsConfig)) { - String patternString = conf.get(key); - String[] patterns = StringUtils.split(patternString, ","); - for (String pattern : patterns) { - if (StringUtils.isNotEmpty(pattern)) { - String trimmedPattern = pattern.trim(); - metricPatterns.add(trimmedPattern); - } - } - } - } - return metricPatterns; - } - - /** - * Get the list of downsamplers that are configured in ams-site - * Sample config - <name>timeline.metrics.downsampler.topn.metric.patterns</name> - <value>dfs.NNTopUserOpCounts.windowMs=60000.op%,dfs.NNTopUserOpCounts.windowMs=300000.op%</value> - - <name>timeline.metrics.downsampler.topn.value</name> - <value>10</value> - - <name>timeline.metrics.downsampler.topn.function</name> - <value>max</value> - * @param configuration - * @return - */ - public static List<CustomDownSampler> getDownSamplers(Configuration configuration) { - - Map<String,String> conf = configuration.getValByRegex(downSamplerConfigPrefix + "*"); - List<CustomDownSampler> downSamplers = new ArrayList<>(); - Set<String> keys = conf.keySet(); - - try { - for (String key : keys) { - if (key.startsWith(downSamplerConfigPrefix) && key.endsWith(downSamplerMetricPatternsConfig)) { - String type = key.split("\\.")[3]; - CustomDownSampler downSampler = getDownSamplerByType(type, conf); - if (downSampler != null) { - downSamplers.add(downSampler); - } - } - } - } catch (Exception e) { - LOG.warn("Exception caught while parsing downsampler configs from ams-site : " + e.getMessage()); - } - return downSamplers; - } - - public static CustomDownSampler getDownSamplerByType(String type, Map<String, String> conf) { - if (type == null) { - return null; - } - - if (StringUtils.isNotEmpty(type) && type.equalsIgnoreCase(topNDownSampler)) { - return TopNDownSampler.fromConfig(conf); - } - - LOG.warn("Unknown downsampler requested : " + type); - return null; - } -} http://git-wip-us.apache.org/repos/asf/ambari/blob/42112e28/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/Function.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/Function.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/Function.java deleted file mode 100644 index ab9d2e9..0000000 --- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/Function.java +++ /dev/null @@ -1,212 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators; - -import java.util.Arrays; - -/** - * Is used to determine metrics aggregate table. - * - * @see org.apache.hadoop.yarn.server.applicationhistoryservice.webapp.TimelineWebServices#getTimelineMetric - * @see org.apache.hadoop.yarn.server.applicationhistoryservice.webapp.TimelineWebServices#getTimelineMetrics - */ -public class Function { - public static Function DEFAULT_VALUE_FUNCTION = new Function(ReadFunction.VALUE, null); - private static final String SUFFIX_SEPARATOR = "\\._"; - - private ReadFunction readFunction = ReadFunction.VALUE; - private PostProcessingFunction postProcessingFunction = null; - - public Function() { - } - - public Function(ReadFunction readFunction, - PostProcessingFunction ppFunction){ - if (readFunction!=null){ - this.readFunction = readFunction ; - } - this.postProcessingFunction = ppFunction; - } - - /** - * Segregate post processing function eg: rate from aggregate function, - * example: avg, in any order - * @param metricName metric name from request - * @return @Function - */ - public static Function fromMetricName(String metricName) { - // gets postprocessing, and aggregation function - // ex. Metric._rate._avg - String[] parts = metricName.split(SUFFIX_SEPARATOR); - - ReadFunction readFunction = ReadFunction.VALUE; - PostProcessingFunction ppFunction = null; - - if (parts.length <= 1) { - return new Function(readFunction, null); - } - if (parts.length > 3) { - throw new IllegalArgumentException("Invalid number of functions specified."); - } - - // Parse functions - boolean isSuccessful = false; // Best effort - for (String part : parts) { - if (ReadFunction.isPresent(part)) { - readFunction = ReadFunction.getFunction(part); - isSuccessful = true; - } - if (PostProcessingFunction.isPresent(part)) { - ppFunction = PostProcessingFunction.getFunction(part); - isSuccessful = true; - } - } - - // Throw exception if parsing failed - if (!isSuccessful) { - throw new FunctionFormatException("Could not parse provided functions: " + - "" + Arrays.asList(parts)); - } - - return new Function(readFunction, ppFunction); - } - - public String getSuffix(){ - return (postProcessingFunction == null)? readFunction.getSuffix() : - postProcessingFunction.getSuffix() + readFunction.getSuffix(); - } - - public ReadFunction getReadFunction() { - return readFunction; - } - - @Override - public String toString() { - return "Function{" + - "readFunction=" + readFunction + - ", postProcessingFunction=" + postProcessingFunction + - '}'; - } - - @Override - public boolean equals(Object o) { - if (this == o) return true; - if (!(o instanceof Function)) return false; - - Function function = (Function) o; - - return postProcessingFunction == function.postProcessingFunction - && readFunction == function.readFunction; - - } - - @Override - public int hashCode() { - int result = readFunction.hashCode(); - result = 31 * result + (postProcessingFunction != null ? - postProcessingFunction.hashCode() : 0); - return result; - } - - public enum PostProcessingFunction { - NONE(""), - RATE("._rate"), - DIFF("._diff"); - - PostProcessingFunction(String suffix){ - this.suffix = suffix; - } - - private String suffix = ""; - - public String getSuffix(){ - return suffix; - } - - public static boolean isPresent(String functionName) { - try { - PostProcessingFunction.valueOf(functionName.toUpperCase()); - } catch (IllegalArgumentException e) { - return false; - } - return true; - } - - public static PostProcessingFunction getFunction(String functionName) throws FunctionFormatException { - if (functionName == null) { - return NONE; - } - - try { - return PostProcessingFunction.valueOf(functionName.toUpperCase()); - } catch (IllegalArgumentException e) { - throw new FunctionFormatException("Function should be ._rate", e); - } - } - } - - public enum ReadFunction { - VALUE(""), - AVG("._avg"), - MIN("._min"), - MAX("._max"), - SUM("._sum"); - - private final String suffix; - - ReadFunction(String suffix){ - this.suffix = suffix; - } - - public String getSuffix() { - return suffix; - } - - public static boolean isPresent(String functionName) { - try { - ReadFunction.valueOf(functionName.toUpperCase()); - } catch (IllegalArgumentException e) { - return false; - } - return true; - } - - public static ReadFunction getFunction(String functionName) throws FunctionFormatException { - if (functionName == null) { - return VALUE; - } - try { - return ReadFunction.valueOf(functionName.toUpperCase()); - } catch (IllegalArgumentException e) { - throw new FunctionFormatException( - "Function should be sum, avg, min, max. Got " + functionName, e); - } - } - } - - public static class FunctionFormatException extends IllegalArgumentException { - public FunctionFormatException(String message) { - super(message); - } - - public FunctionFormatException(String message, Throwable cause) { - super(message, cause); - } - } - -} http://git-wip-us.apache.org/repos/asf/ambari/blob/42112e28/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineClusterMetric.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineClusterMetric.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineClusterMetric.java deleted file mode 100644 index 6e793e1..0000000 --- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineClusterMetric.java +++ /dev/null @@ -1,97 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators; - -public class TimelineClusterMetric { - private String metricName; - private String appId; - private String instanceId; - private long timestamp; - - public TimelineClusterMetric(String metricName, String appId, String instanceId, - long timestamp) { - this.metricName = metricName; - this.appId = appId; - this.instanceId = instanceId; - this.timestamp = timestamp; - } - - public String getMetricName() { - return metricName; - } - - public String getAppId() { - return appId; - } - - public String getInstanceId() { - return instanceId; - } - - public long getTimestamp() { - return timestamp; - } - - @Override - public boolean equals(Object o) { - if (this == o) return true; - if (o == null || getClass() != o.getClass()) return false; - - TimelineClusterMetric that = (TimelineClusterMetric) o; - - if (timestamp != that.timestamp) return false; - if (appId != null ? !appId.equals(that.appId) : that.appId != null) - return false; - if (instanceId != null ? !instanceId.equals(that.instanceId) : that.instanceId != null) - return false; - if (!metricName.equals(that.metricName)) return false; - - return true; - } - - public boolean equalsExceptTime(TimelineClusterMetric metric) { - if (!metricName.equals(metric.metricName)) return false; - if (!appId.equals(metric.appId)) return false; - if (instanceId != null ? !instanceId.equals(metric.instanceId) : metric.instanceId != null) - return false; - - return true; - } - @Override - public int hashCode() { - int result = metricName.hashCode(); - result = 31 * result + (appId != null ? appId.hashCode() : 0); - result = 31 * result + (instanceId != null ? instanceId.hashCode() : 0); - result = 31 * result + (int) (timestamp ^ (timestamp >>> 32)); - return result; - } - - @Override - public String toString() { - return "TimelineClusterMetric{" + - "metricName='" + metricName + '\'' + - ", appId='" + appId + '\'' + - ", instanceId='" + instanceId + '\'' + - ", timestamp=" + timestamp + - '}'; - } - - public void setTimestamp(long timestamp) { - this.timestamp = timestamp; - } -} http://git-wip-us.apache.org/repos/asf/ambari/blob/42112e28/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricAggregator.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricAggregator.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricAggregator.java deleted file mode 100644 index 150e3f1..0000000 --- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricAggregator.java +++ /dev/null @@ -1,59 +0,0 @@ -package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators; - -import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.AggregationTaskRunner.AGGREGATOR_NAME; - -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * <p/> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p/> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -public interface TimelineMetricAggregator extends Runnable { - /** - * Aggregate metric data within the time bounds. - * - * @param startTime start time millis - * @param endTime end time millis - * @return success - */ - boolean doWork(long startTime, long endTime); - - /** - * Is aggregator is disabled by configuration. - * - * @return true/false - */ - boolean isDisabled(); - - /** - * Return aggregator Interval - * - * @return Interval in Millis - */ - Long getSleepIntervalMillis(); - - /** - * Get aggregator name - * @return @AGGREGATOR_NAME - */ - AGGREGATOR_NAME getName(); - - /** - * Known aggregator types - */ - enum AGGREGATOR_TYPE { - CLUSTER, - HOST - } -} http://git-wip-us.apache.org/repos/asf/ambari/blob/42112e28/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricAggregatorFactory.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricAggregatorFactory.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricAggregatorFactory.java deleted file mode 100644 index 3728d19..0000000 --- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricAggregatorFactory.java +++ /dev/null @@ -1,528 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators; - -import static java.util.concurrent.TimeUnit.SECONDS; -import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.CLUSTER_AGGREGATOR_DAILY_CHECKPOINT_CUTOFF_MULTIPLIER; -import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.CLUSTER_AGGREGATOR_DAILY_DISABLED; -import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.CLUSTER_AGGREGATOR_DAILY_SLEEP_INTERVAL; -import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.CLUSTER_AGGREGATOR_HOUR_CHECKPOINT_CUTOFF_MULTIPLIER; -import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.CLUSTER_AGGREGATOR_HOUR_DISABLED; -import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.CLUSTER_AGGREGATOR_HOUR_SLEEP_INTERVAL; -import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.CLUSTER_AGGREGATOR_MINUTE_CHECKPOINT_CUTOFF_MULTIPLIER; -import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.CLUSTER_AGGREGATOR_MINUTE_DISABLED; -import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.CLUSTER_AGGREGATOR_MINUTE_SLEEP_INTERVAL; -import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.CLUSTER_AGGREGATOR_SECOND_CHECKPOINT_CUTOFF_MULTIPLIER; -import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.CLUSTER_AGGREGATOR_SECOND_DISABLED; -import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.CLUSTER_AGGREGATOR_SECOND_SLEEP_INTERVAL; -import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.CLUSTER_AGGREGATOR_TIMESLICE_INTERVAL; -import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.DEFAULT_CHECKPOINT_LOCATION; -import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.HOST_AGGREGATOR_DAILY_CHECKPOINT_CUTOFF_MULTIPLIER; -import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.HOST_AGGREGATOR_DAILY_DISABLED; -import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.HOST_AGGREGATOR_DAILY_SLEEP_INTERVAL; -import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.HOST_AGGREGATOR_HOUR_CHECKPOINT_CUTOFF_MULTIPLIER; -import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.HOST_AGGREGATOR_HOUR_DISABLED; -import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.HOST_AGGREGATOR_HOUR_SLEEP_INTERVAL; -import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.HOST_AGGREGATOR_MINUTE_CHECKPOINT_CUTOFF_MULTIPLIER; -import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.HOST_AGGREGATOR_MINUTE_DISABLED; -import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.HOST_AGGREGATOR_MINUTE_SLEEP_INTERVAL; -import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.TIMELINE_METRICS_AGGREGATOR_CHECKPOINT_DIR; -import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.USE_GROUPBY_AGGREGATOR_QUERIES; -import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.AggregationTaskRunner.AGGREGATOR_NAME.METRIC_AGGREGATE_DAILY; -import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.AggregationTaskRunner.AGGREGATOR_NAME.METRIC_AGGREGATE_HOURLY; -import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.AggregationTaskRunner.AGGREGATOR_NAME.METRIC_AGGREGATE_MINUTE; -import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.AggregationTaskRunner.AGGREGATOR_NAME.METRIC_AGGREGATE_SECOND; -import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.AggregationTaskRunner.AGGREGATOR_NAME.METRIC_RECORD_DAILY; -import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.AggregationTaskRunner.AGGREGATOR_NAME.METRIC_RECORD_HOURLY; -import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.AggregationTaskRunner.AGGREGATOR_NAME.METRIC_RECORD_MINUTE; -import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.METRICS_AGGREGATE_DAILY_TABLE_NAME; -import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.METRICS_AGGREGATE_HOURLY_TABLE_NAME; -import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.METRICS_AGGREGATE_MINUTE_TABLE_NAME; -import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.METRICS_CLUSTER_AGGREGATE_DAILY_TABLE_NAME; -import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.METRICS_CLUSTER_AGGREGATE_HOURLY_TABLE_NAME; -import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.METRICS_CLUSTER_AGGREGATE_MINUTE_TABLE_NAME; -import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.METRICS_CLUSTER_AGGREGATE_TABLE_NAME; -import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.METRICS_RECORD_TABLE_NAME; - -import java.util.concurrent.ConcurrentHashMap; - -import org.apache.commons.io.FilenameUtils; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixHBaseAccessor; -import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration; -import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricDistributedCache; -import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.MetricCollectorHAController; -import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.discovery.TimelineMetricMetadataManager; - -/** - * Factory class that knows how to create a aggregator instance using - * TimelineMetricConfiguration - */ -public class TimelineMetricAggregatorFactory { - private static final String HOST_AGGREGATE_MINUTE_CHECKPOINT_FILE = - "timeline-metrics-host-aggregator-checkpoint"; - private static final String HOST_AGGREGATE_HOURLY_CHECKPOINT_FILE = - "timeline-metrics-host-aggregator-hourly-checkpoint"; - private static final String HOST_AGGREGATE_DAILY_CHECKPOINT_FILE = - "timeline-metrics-host-aggregator-daily-checkpoint"; - private static final String CLUSTER_AGGREGATOR_CHECKPOINT_FILE = - "timeline-metrics-cluster-aggregator-checkpoint"; - private static final String CLUSTER_AGGREGATOR_MINUTE_CHECKPOINT_FILE = - "timeline-metrics-cluster-aggregator-minute-checkpoint"; - private static final String CLUSTER_AGGREGATOR_HOURLY_CHECKPOINT_FILE = - "timeline-metrics-cluster-aggregator-hourly-checkpoint"; - private static final String CLUSTER_AGGREGATOR_DAILY_CHECKPOINT_FILE = - "timeline-metrics-cluster-aggregator-daily-checkpoint"; - - private static boolean useGroupByAggregator(Configuration metricsConf) { - return Boolean.parseBoolean(metricsConf.get(USE_GROUPBY_AGGREGATOR_QUERIES, "true")); - } - - /** - * Minute based aggregation for hosts. - * Interval : 5 mins - */ - public static TimelineMetricAggregator createTimelineMetricAggregatorMinute - (PhoenixHBaseAccessor hBaseAccessor, Configuration metricsConf, - TimelineMetricMetadataManager metadataManager, - MetricCollectorHAController haController) { - - String checkpointDir = metricsConf.get( - TIMELINE_METRICS_AGGREGATOR_CHECKPOINT_DIR, DEFAULT_CHECKPOINT_LOCATION); - String checkpointLocation = FilenameUtils.concat(checkpointDir, - HOST_AGGREGATE_MINUTE_CHECKPOINT_FILE); - long sleepIntervalMillis = SECONDS.toMillis(metricsConf.getLong - (HOST_AGGREGATOR_MINUTE_SLEEP_INTERVAL, 300l)); // 5 mins - - int checkpointCutOffMultiplier = metricsConf.getInt - (HOST_AGGREGATOR_MINUTE_CHECKPOINT_CUTOFF_MULTIPLIER, 3); - String hostAggregatorDisabledParam = HOST_AGGREGATOR_MINUTE_DISABLED; - - String inputTableName = METRICS_RECORD_TABLE_NAME; - String outputTableName = METRICS_AGGREGATE_MINUTE_TABLE_NAME; - - if (useGroupByAggregator(metricsConf)) { - return new org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.v2.TimelineMetricHostAggregator( - METRIC_RECORD_MINUTE, - hBaseAccessor, metricsConf, - checkpointLocation, - sleepIntervalMillis, - checkpointCutOffMultiplier, - hostAggregatorDisabledParam, - inputTableName, - outputTableName, - 120000l, - haController - ); - } - - return new TimelineMetricHostAggregator( - METRIC_RECORD_MINUTE, - metadataManager, - hBaseAccessor, metricsConf, - checkpointLocation, - sleepIntervalMillis, - checkpointCutOffMultiplier, - hostAggregatorDisabledParam, - inputTableName, - outputTableName, - 120000l, - haController); - } - - /** - * Hourly aggregation for hosts. - * Interval : 1 hour - */ - public static TimelineMetricAggregator createTimelineMetricAggregatorHourly - (PhoenixHBaseAccessor hBaseAccessor, Configuration metricsConf, - TimelineMetricMetadataManager metadataManager, - MetricCollectorHAController haController) { - - String checkpointDir = metricsConf.get( - TIMELINE_METRICS_AGGREGATOR_CHECKPOINT_DIR, DEFAULT_CHECKPOINT_LOCATION); - String checkpointLocation = FilenameUtils.concat(checkpointDir, - HOST_AGGREGATE_HOURLY_CHECKPOINT_FILE); - long sleepIntervalMillis = SECONDS.toMillis(metricsConf.getLong - (HOST_AGGREGATOR_HOUR_SLEEP_INTERVAL, 3600l)); - - int checkpointCutOffMultiplier = metricsConf.getInt - (HOST_AGGREGATOR_HOUR_CHECKPOINT_CUTOFF_MULTIPLIER, 2); - String hostAggregatorDisabledParam = HOST_AGGREGATOR_HOUR_DISABLED; - - String inputTableName = METRICS_AGGREGATE_MINUTE_TABLE_NAME; - String outputTableName = METRICS_AGGREGATE_HOURLY_TABLE_NAME; - - if (useGroupByAggregator(metricsConf)) { - return new org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.v2.TimelineMetricHostAggregator( - METRIC_RECORD_HOURLY, - hBaseAccessor, metricsConf, - checkpointLocation, - sleepIntervalMillis, - checkpointCutOffMultiplier, - hostAggregatorDisabledParam, - inputTableName, - outputTableName, - 3600000l, - haController - ); - } - - return new TimelineMetricHostAggregator( - METRIC_RECORD_HOURLY, - metadataManager, - hBaseAccessor, metricsConf, - checkpointLocation, - sleepIntervalMillis, - checkpointCutOffMultiplier, - hostAggregatorDisabledParam, - inputTableName, - outputTableName, - 3600000l, - haController); - } - - /** - * Daily aggregation for hosts. - * Interval : 1 day - */ - public static TimelineMetricAggregator createTimelineMetricAggregatorDaily - (PhoenixHBaseAccessor hBaseAccessor, Configuration metricsConf, - TimelineMetricMetadataManager metadataManager, - MetricCollectorHAController haController) { - - String checkpointDir = metricsConf.get( - TIMELINE_METRICS_AGGREGATOR_CHECKPOINT_DIR, DEFAULT_CHECKPOINT_LOCATION); - String checkpointLocation = FilenameUtils.concat(checkpointDir, - HOST_AGGREGATE_DAILY_CHECKPOINT_FILE); - long sleepIntervalMillis = SECONDS.toMillis(metricsConf.getLong - (HOST_AGGREGATOR_DAILY_SLEEP_INTERVAL, 86400l)); - - int checkpointCutOffMultiplier = metricsConf.getInt - (HOST_AGGREGATOR_DAILY_CHECKPOINT_CUTOFF_MULTIPLIER, 1); - String hostAggregatorDisabledParam = HOST_AGGREGATOR_DAILY_DISABLED; - - String inputTableName = METRICS_AGGREGATE_HOURLY_TABLE_NAME; - String outputTableName = METRICS_AGGREGATE_DAILY_TABLE_NAME; - - if (useGroupByAggregator(metricsConf)) { - return new org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.v2.TimelineMetricHostAggregator( - METRIC_RECORD_DAILY, - hBaseAccessor, metricsConf, - checkpointLocation, - sleepIntervalMillis, - checkpointCutOffMultiplier, - hostAggregatorDisabledParam, - inputTableName, - outputTableName, - 3600000l, - haController - ); - } - - return new TimelineMetricHostAggregator( - METRIC_RECORD_DAILY, - metadataManager, - hBaseAccessor, metricsConf, - checkpointLocation, - sleepIntervalMillis, - checkpointCutOffMultiplier, - hostAggregatorDisabledParam, - inputTableName, - outputTableName, - 3600000l, - haController); - } - - /** - * Second aggregation for cluster. - * Interval : 2 mins - * Timeslice : 30 sec - */ - public static TimelineMetricAggregator createTimelineClusterAggregatorSecond( - PhoenixHBaseAccessor hBaseAccessor, Configuration metricsConf, - TimelineMetricMetadataManager metadataManager, - MetricCollectorHAController haController, - TimelineMetricDistributedCache distributedCache) { - - String checkpointDir = metricsConf.get( - TIMELINE_METRICS_AGGREGATOR_CHECKPOINT_DIR, DEFAULT_CHECKPOINT_LOCATION); - - String checkpointLocation = FilenameUtils.concat(checkpointDir, - CLUSTER_AGGREGATOR_CHECKPOINT_FILE); - - long sleepIntervalMillis = SECONDS.toMillis(metricsConf.getLong - (CLUSTER_AGGREGATOR_SECOND_SLEEP_INTERVAL, 120l)); - - long timeSliceIntervalMillis = SECONDS.toMillis(metricsConf.getInt - (CLUSTER_AGGREGATOR_TIMESLICE_INTERVAL, 30)); - - int checkpointCutOffMultiplier = - metricsConf.getInt(CLUSTER_AGGREGATOR_SECOND_CHECKPOINT_CUTOFF_MULTIPLIER, 2); - - String outputTableName = METRICS_CLUSTER_AGGREGATE_TABLE_NAME; - String aggregatorDisabledParam = CLUSTER_AGGREGATOR_SECOND_DISABLED; - - // Second based aggregation have added responsibility of time slicing - if (TimelineMetricConfiguration.getInstance().isCollectorInMemoryAggregationEnabled()) { - return new TimelineMetricClusterAggregatorSecondWithCacheSource( - METRIC_AGGREGATE_SECOND, - metadataManager, - hBaseAccessor, metricsConf, - checkpointLocation, - sleepIntervalMillis, - checkpointCutOffMultiplier, - aggregatorDisabledParam, - null, - outputTableName, - 120000l, - timeSliceIntervalMillis, - haController, - distributedCache - ); - } - - String inputTableName = METRICS_RECORD_TABLE_NAME; - return new TimelineMetricClusterAggregatorSecond( - METRIC_AGGREGATE_SECOND, - metadataManager, - hBaseAccessor, metricsConf, - checkpointLocation, - sleepIntervalMillis, - checkpointCutOffMultiplier, - aggregatorDisabledParam, - inputTableName, - outputTableName, - 120000l, - timeSliceIntervalMillis, - haController - ); - } - - /** - * Minute aggregation for cluster. - * Interval : 5 mins - */ - public static TimelineMetricAggregator createTimelineClusterAggregatorMinute( - PhoenixHBaseAccessor hBaseAccessor, Configuration metricsConf, - TimelineMetricMetadataManager metadataManager, - MetricCollectorHAController haController) { - - String checkpointDir = metricsConf.get( - TIMELINE_METRICS_AGGREGATOR_CHECKPOINT_DIR, DEFAULT_CHECKPOINT_LOCATION); - - String checkpointLocation = FilenameUtils.concat(checkpointDir, - CLUSTER_AGGREGATOR_MINUTE_CHECKPOINT_FILE); - - long sleepIntervalMillis = SECONDS.toMillis(metricsConf.getLong - (CLUSTER_AGGREGATOR_MINUTE_SLEEP_INTERVAL, 300l)); - - int checkpointCutOffMultiplier = metricsConf.getInt - (CLUSTER_AGGREGATOR_MINUTE_CHECKPOINT_CUTOFF_MULTIPLIER, 2); - - String inputTableName = METRICS_CLUSTER_AGGREGATE_TABLE_NAME; - String outputTableName = METRICS_CLUSTER_AGGREGATE_MINUTE_TABLE_NAME; - String aggregatorDisabledParam = CLUSTER_AGGREGATOR_MINUTE_DISABLED; - - if (useGroupByAggregator(metricsConf)) { - return new org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.v2.TimelineMetricClusterAggregator( - METRIC_AGGREGATE_MINUTE, - hBaseAccessor, metricsConf, - checkpointLocation, - sleepIntervalMillis, - checkpointCutOffMultiplier, - aggregatorDisabledParam, - inputTableName, - outputTableName, - 120000l, - haController - ); - } - - return new TimelineMetricClusterAggregator( - METRIC_AGGREGATE_MINUTE, - metadataManager, - hBaseAccessor, metricsConf, - checkpointLocation, - sleepIntervalMillis, - checkpointCutOffMultiplier, - aggregatorDisabledParam, - inputTableName, - outputTableName, - 120000l, - haController - ); - } - - /** - * Hourly aggregation for cluster. - * Interval : 1 hour - */ - public static TimelineMetricAggregator createTimelineClusterAggregatorHourly( - PhoenixHBaseAccessor hBaseAccessor, Configuration metricsConf, - TimelineMetricMetadataManager metadataManager, - MetricCollectorHAController haController) { - - String checkpointDir = metricsConf.get( - TIMELINE_METRICS_AGGREGATOR_CHECKPOINT_DIR, DEFAULT_CHECKPOINT_LOCATION); - - String checkpointLocation = FilenameUtils.concat(checkpointDir, - CLUSTER_AGGREGATOR_HOURLY_CHECKPOINT_FILE); - - long sleepIntervalMillis = SECONDS.toMillis(metricsConf.getLong - (CLUSTER_AGGREGATOR_HOUR_SLEEP_INTERVAL, 3600l)); - - int checkpointCutOffMultiplier = metricsConf.getInt - (CLUSTER_AGGREGATOR_HOUR_CHECKPOINT_CUTOFF_MULTIPLIER, 2); - - String inputTableName = METRICS_CLUSTER_AGGREGATE_TABLE_NAME; - String outputTableName = METRICS_CLUSTER_AGGREGATE_HOURLY_TABLE_NAME; - String aggregatorDisabledParam = CLUSTER_AGGREGATOR_HOUR_DISABLED; - - if (useGroupByAggregator(metricsConf)) { - return new org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.v2.TimelineMetricClusterAggregator( - METRIC_AGGREGATE_HOURLY, - hBaseAccessor, metricsConf, - checkpointLocation, - sleepIntervalMillis, - checkpointCutOffMultiplier, - aggregatorDisabledParam, - inputTableName, - outputTableName, - 120000l, - haController - ); - } - - return new TimelineMetricClusterAggregator( - METRIC_AGGREGATE_HOURLY, - metadataManager, - hBaseAccessor, metricsConf, - checkpointLocation, - sleepIntervalMillis, - checkpointCutOffMultiplier, - aggregatorDisabledParam, - inputTableName, - outputTableName, - 120000l, - haController - ); - } - - /** - * Daily aggregation for cluster. - * Interval : 1 day - */ - public static TimelineMetricAggregator createTimelineClusterAggregatorDaily( - PhoenixHBaseAccessor hBaseAccessor, Configuration metricsConf, - TimelineMetricMetadataManager metadataManager, - MetricCollectorHAController haController) { - - String checkpointDir = metricsConf.get( - TIMELINE_METRICS_AGGREGATOR_CHECKPOINT_DIR, DEFAULT_CHECKPOINT_LOCATION); - - String checkpointLocation = FilenameUtils.concat(checkpointDir, - CLUSTER_AGGREGATOR_DAILY_CHECKPOINT_FILE); - - long sleepIntervalMillis = SECONDS.toMillis(metricsConf.getLong - (CLUSTER_AGGREGATOR_DAILY_SLEEP_INTERVAL, 86400l)); - - int checkpointCutOffMultiplier = metricsConf.getInt - (CLUSTER_AGGREGATOR_DAILY_CHECKPOINT_CUTOFF_MULTIPLIER, 1); - - String inputTableName = METRICS_CLUSTER_AGGREGATE_HOURLY_TABLE_NAME; - String outputTableName = METRICS_CLUSTER_AGGREGATE_DAILY_TABLE_NAME; - String aggregatorDisabledParam = CLUSTER_AGGREGATOR_DAILY_DISABLED; - - if (useGroupByAggregator(metricsConf)) { - return new org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.v2.TimelineMetricClusterAggregator( - METRIC_AGGREGATE_DAILY, - hBaseAccessor, metricsConf, - checkpointLocation, - sleepIntervalMillis, - checkpointCutOffMultiplier, - aggregatorDisabledParam, - inputTableName, - outputTableName, - 120000l, - haController - ); - } - - return new TimelineMetricClusterAggregator( - METRIC_AGGREGATE_DAILY, - metadataManager, - hBaseAccessor, metricsConf, - checkpointLocation, - sleepIntervalMillis, - checkpointCutOffMultiplier, - aggregatorDisabledParam, - inputTableName, - outputTableName, - 120000l, - haController - ); - } - - public static TimelineMetricAggregator createFilteringTimelineMetricAggregatorMinute(PhoenixHBaseAccessor hBaseAccessor, Configuration metricsConf, TimelineMetricMetadataManager metricMetadataManager, MetricCollectorHAController haController, ConcurrentHashMap<String, Long> postedAggregatedMap) { - String checkpointDir = metricsConf.get( - TIMELINE_METRICS_AGGREGATOR_CHECKPOINT_DIR, DEFAULT_CHECKPOINT_LOCATION); - String checkpointLocation = FilenameUtils.concat(checkpointDir, - HOST_AGGREGATE_MINUTE_CHECKPOINT_FILE); - long sleepIntervalMillis = SECONDS.toMillis(metricsConf.getLong - (HOST_AGGREGATOR_MINUTE_SLEEP_INTERVAL, 300l)); // 5 mins - - int checkpointCutOffMultiplier = metricsConf.getInt - (HOST_AGGREGATOR_MINUTE_CHECKPOINT_CUTOFF_MULTIPLIER, 3); - String hostAggregatorDisabledParam = HOST_AGGREGATOR_MINUTE_DISABLED; - - String inputTableName = METRICS_RECORD_TABLE_NAME; - String outputTableName = METRICS_AGGREGATE_MINUTE_TABLE_NAME; - - if (useGroupByAggregator(metricsConf)) { - return new org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.v2.TimelineMetricFilteringHostAggregator( - METRIC_RECORD_MINUTE, - metricMetadataManager, - hBaseAccessor, metricsConf, - checkpointLocation, - sleepIntervalMillis, - checkpointCutOffMultiplier, - hostAggregatorDisabledParam, - inputTableName, - outputTableName, - 120000l, - haController, - postedAggregatedMap - ); - } - - return new TimelineMetricFilteringHostAggregator( - METRIC_RECORD_MINUTE, - metricMetadataManager, - hBaseAccessor, metricsConf, - checkpointLocation, - sleepIntervalMillis, - checkpointCutOffMultiplier, - hostAggregatorDisabledParam, - inputTableName, - outputTableName, - 120000l, - haController, - postedAggregatedMap); - } -} http://git-wip-us.apache.org/repos/asf/ambari/blob/42112e28/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricAppAggregator.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricAppAggregator.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricAppAggregator.java deleted file mode 100644 index b06b147..0000000 --- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricAppAggregator.java +++ /dev/null @@ -1,189 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators; - -import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.CLUSTER_AGGREGATOR_APP_IDS; -import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.HOST_APP_ID; - -import java.util.Arrays; -import java.util.Collections; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; - -import org.apache.commons.lang.StringUtils; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.metrics2.sink.timeline.MetricClusterAggregate; -import org.apache.hadoop.metrics2.sink.timeline.TimelineMetricMetadata; -import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricsFilter; -import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.discovery.TimelineMetricHostMetadata; -import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.discovery.TimelineMetricMetadataKey; -import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.discovery.TimelineMetricMetadataManager; - -/** - * Aggregator responsible for providing app level host aggregates. This task - * is accomplished without doing a round trip to storage, rather - * TimelineMetricClusterAggregators are responsible for lifecycle of - * @TimelineMetricAppAggregator and provide the raw data to aggregate. - */ -public class TimelineMetricAppAggregator { - private static final Log LOG = LogFactory.getLog(TimelineMetricAppAggregator.class); - // Lookup to check candidacy of an app - private final List<String> appIdsToAggregate; - private final Map<String, TimelineMetricHostMetadata> hostMetadata; - Map<TimelineClusterMetric, MetricClusterAggregate> aggregateClusterMetrics = new HashMap<>(); - TimelineMetricMetadataManager metadataManagerInstance; - - public TimelineMetricAppAggregator(TimelineMetricMetadataManager metadataManager, - Configuration metricsConf) { - appIdsToAggregate = getAppIdsForHostAggregation(metricsConf); - hostMetadata = metadataManager.getHostedAppsCache(); - metadataManagerInstance = metadataManager; - LOG.info("AppIds configured for aggregation: " + appIdsToAggregate); - } - - /** - * Lifecycle method to initialize aggregation cycle. - */ - public void init() { - LOG.debug("Initializing aggregation cycle."); - aggregateClusterMetrics = new HashMap<>(); - } - - /** - * Lifecycle method to indicate end of aggregation cycle. - */ - public void cleanup() { - LOG.debug("Cleanup aggregated data."); - aggregateClusterMetrics = null; - } - - /** - * Calculate aggregates if the clusterMetric is a Host metric for recorded - * apps that are housed by this host. - * - * @param clusterMetric @TimelineClusterMetric Host / App metric - * @param hostname This is the hostname from which this clusterMetric originated. - * @param metricValue The metric value for this metric. - */ - public void processTimelineClusterMetric(TimelineClusterMetric clusterMetric, - String hostname, Double metricValue) { - - String appId = clusterMetric.getAppId(); - if (appId == null) { - return; // No real use case except tests - } - - // If metric is a host metric and host has apps on it - if (appId.equalsIgnoreCase(HOST_APP_ID)) { - // Candidate metric, update app aggregates - if (hostMetadata.containsKey(hostname)) { - updateAppAggregatesFromHostMetric(clusterMetric, hostname, metricValue); - } - } else { - // Build the hostedapps map if not a host metric - // Check app candidacy for host aggregation - if (appIdsToAggregate.contains(appId)) { - TimelineMetricHostMetadata timelineMetricHostMetadata = hostMetadata.get(hostname); - ConcurrentHashMap<String, String> appIds; - if (timelineMetricHostMetadata == null) { - appIds = new ConcurrentHashMap<>(); - hostMetadata.put(hostname, new TimelineMetricHostMetadata(appIds)); - } else { - appIds = timelineMetricHostMetadata.getHostedApps(); - } - if (!appIds.containsKey(appId)) { - appIds.put(appId, appId); - LOG.info("Adding appId to hosted apps: appId = " + - clusterMetric.getAppId() + ", hostname = " + hostname); - } - } - } - } - - /** - * Build a cluster app metric from a host metric - */ - private void updateAppAggregatesFromHostMetric(TimelineClusterMetric clusterMetric, - String hostname, Double metricValue) { - - if (aggregateClusterMetrics == null) { - LOG.error("Aggregation requested without init call."); - return; - } - - TimelineMetricMetadataKey appKey = new TimelineMetricMetadataKey(clusterMetric.getMetricName(), HOST_APP_ID, clusterMetric.getInstanceId()); - ConcurrentHashMap<String, String> apps = hostMetadata.get(hostname).getHostedApps(); - for (String appId : apps.keySet()) { - if (appIdsToAggregate.contains(appId)) { - - appKey.setAppId(appId); - TimelineMetricMetadata appMetadata = metadataManagerInstance.getMetadataCacheValue(appKey); - if (appMetadata == null) { - TimelineMetricMetadataKey key = new TimelineMetricMetadataKey(clusterMetric.getMetricName(), HOST_APP_ID, clusterMetric.getInstanceId()); - TimelineMetricMetadata hostMetricMetadata = metadataManagerInstance.getMetadataCacheValue(key); - - if (hostMetricMetadata != null) { - TimelineMetricMetadata timelineMetricMetadata = new TimelineMetricMetadata(clusterMetric.getMetricName(), - appId, clusterMetric.getInstanceId(), hostMetricMetadata.getUnits(), hostMetricMetadata.getType(), hostMetricMetadata.getSeriesStartTime(), - hostMetricMetadata.isSupportsAggregates(), TimelineMetricsFilter.acceptMetric(clusterMetric.getMetricName(), appId)); - metadataManagerInstance.putIfModifiedTimelineMetricMetadata(timelineMetricMetadata); - } - } - - // Add a new cluster aggregate metric if none exists - TimelineClusterMetric appTimelineClusterMetric = - new TimelineClusterMetric(clusterMetric.getMetricName(), - appId, - clusterMetric.getInstanceId(), - clusterMetric.getTimestamp()); - - MetricClusterAggregate clusterAggregate = aggregateClusterMetrics.get(appTimelineClusterMetric); - - if (clusterAggregate == null) { - clusterAggregate = new MetricClusterAggregate(metricValue, 1, null, metricValue, metricValue); - aggregateClusterMetrics.put(appTimelineClusterMetric, clusterAggregate); - } else { - clusterAggregate.updateSum(metricValue); - clusterAggregate.updateNumberOfHosts(1); - clusterAggregate.updateMax(metricValue); - clusterAggregate.updateMin(metricValue); - } - } - - } - } - - /** - * Return current copy of aggregated data. - */ - public Map<TimelineClusterMetric, MetricClusterAggregate> getAggregateClusterMetrics() { - return aggregateClusterMetrics; - } - - private List<String> getAppIdsForHostAggregation(Configuration metricsConf) { - String appIds = metricsConf.get(CLUSTER_AGGREGATOR_APP_IDS); - if (!StringUtils.isEmpty(appIds)) { - return Arrays.asList(StringUtils.stripAll(appIds.split(","))); - } - return Collections.emptyList(); - } -}
