http://git-wip-us.apache.org/repos/asf/ambari/blob/42112e28/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/aggregators/AggregatorUtils.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/aggregators/AggregatorUtils.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/aggregators/AggregatorUtils.java new file mode 100644 index 0000000..43c1f2b --- /dev/null +++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/aggregators/AggregatorUtils.java @@ -0,0 +1,254 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ambari.metrics.core.timeline.aggregators; + + +import java.util.ArrayList; +import java.util.Date; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.TreeMap; + +import org.apache.commons.collections.MapUtils; +import org.apache.commons.lang.StringUtils; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.metrics2.sink.timeline.PostProcessingUtil; +import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric; + +/** + * + */ +public class AggregatorUtils { + + private static final Log LOG = LogFactory.getLog(AggregatorUtils.class); + + public static double[] calculateAggregates(Map<Long, Double> metricValues) { + double[] values = new double[4]; + double max = Double.MIN_VALUE; + double min = Double.MAX_VALUE; + double sum = 0.0; + int metricCount = 0; + + if (metricValues != null && !metricValues.isEmpty()) { + for (Double value : metricValues.values()) { + // TODO: Some nulls in data - need to investigate null values from host + if (value != null) { + if (value > max) { + max = value; + } + if (value < min) { + min = value; + } + sum += value; + } + } + metricCount = metricValues.values().size(); + } + // BR: WHY ZERO is a good idea? + values[0] = sum; + values[1] = max != Double.MIN_VALUE ? max : 0.0; + values[2] = min != Double.MAX_VALUE ? min : 0.0; + values[3] = metricCount; + + return values; + } + + public static Map<TimelineClusterMetric, Double> sliceFromTimelineMetric( + TimelineMetric timelineMetric, List<Long[]> timeSlices, boolean interpolationEnabled) { + + if (timelineMetric.getMetricValues().isEmpty()) { + return null; + } + + Map<TimelineClusterMetric, Double> timelineClusterMetricMap = + new HashMap<>(); + + Long prevTimestamp = -1l; + TimelineClusterMetric prevMetric = null; + int count = 0; + double sum = 0.0; + + Map<Long,Double> timeSliceValueMap = new HashMap<>(); + for (Map.Entry<Long, Double> metric : timelineMetric.getMetricValues().entrySet()) { + if (metric.getValue() == null) { + continue; + } + + Long timestamp = getSliceTimeForMetric(timeSlices, Long.parseLong(metric.getKey().toString())); + if (timestamp != -1) { + // Metric is within desired time range + TimelineClusterMetric clusterMetric = new TimelineClusterMetric( + timelineMetric.getMetricName(), + timelineMetric.getAppId(), + timelineMetric.getInstanceId(), + timestamp); + + if (prevTimestamp < 0 || timestamp.equals(prevTimestamp)) { + Double newValue = metric.getValue(); + if (newValue > 0.0) { + sum += newValue; + count++; + } + } else { + double metricValue = (count > 0) ? (sum / count) : 0.0; + timelineClusterMetricMap.put(prevMetric, metricValue); + timeSliceValueMap.put(prevMetric.getTimestamp(), metricValue); + sum = metric.getValue(); + count = sum > 0.0 ? 1 : 0; + } + + prevTimestamp = timestamp; + prevMetric = clusterMetric; + } + } + + if (prevTimestamp > 0) { + double metricValue = (count > 0) ? (sum / count) : 0.0; + timelineClusterMetricMap.put(prevMetric, metricValue); + timeSliceValueMap.put(prevTimestamp, metricValue); + } + + if (interpolationEnabled) { + Map<Long, Double> interpolatedValues = interpolateMissingPeriods(timelineMetric.getMetricValues(), timeSlices, timeSliceValueMap, timelineMetric.getType()); + for (Map.Entry<Long, Double> entry : interpolatedValues.entrySet()) { + TimelineClusterMetric timelineClusterMetric = new TimelineClusterMetric(timelineMetric.getMetricName(), timelineMetric.getAppId(), timelineMetric.getInstanceId(), entry.getKey()); + timelineClusterMetricMap.putIfAbsent(timelineClusterMetric, entry.getValue()); + } + } + + return timelineClusterMetricMap; + } + + private static Map<Long, Double> interpolateMissingPeriods(TreeMap<Long, Double> metricValues, + List<Long[]> timeSlices, + Map<Long, Double> timeSliceValueMap, String type) { + Map<Long, Double> resultClusterMetricMap = new HashMap<>(); + + if (StringUtils.isNotEmpty(type) && "COUNTER".equalsIgnoreCase(type)) { + //For Counter Based metrics, ok to do interpolation and extrapolation + + List<Long> requiredTimestamps = new ArrayList<>(); + for (Long[] timeSlice : timeSlices) { + if (!timeSliceValueMap.containsKey(timeSlice[1])) { + requiredTimestamps.add(timeSlice[1]); + } + } + Map<Long, Double> interpolatedValuesMap = PostProcessingUtil.interpolate(metricValues, requiredTimestamps); + + if (interpolatedValuesMap != null) { + for (Map.Entry<Long, Double> entry : interpolatedValuesMap.entrySet()) { + Double interpolatedValue = entry.getValue(); + + if (interpolatedValue != null) { + resultClusterMetricMap.put( entry.getKey(), interpolatedValue); + } else { + LOG.debug("Cannot compute interpolated value, hence skipping."); + } + } + } + } else { + //For other metrics, ok to do only interpolation + + Double defaultNextSeenValue = null; + if (MapUtils.isEmpty(timeSliceValueMap) && MapUtils.isNotEmpty(metricValues)) { + //If no value was found within the start_time based slices, but the metric has value in the server_time range, + // use that. + + Map.Entry<Long,Double> firstEntry = metricValues.firstEntry(); + defaultNextSeenValue = firstEntry.getValue(); + LOG.debug("Found a data point outside timeslice range: " + new Date(firstEntry.getKey()) + ": " + defaultNextSeenValue); + } + + for (int sliceNum = 0; sliceNum < timeSlices.size(); sliceNum++) { + Long[] timeSlice = timeSlices.get(sliceNum); + + if (!timeSliceValueMap.containsKey(timeSlice[1])) { + LOG.debug("Found an empty slice : " + new Date(timeSlice[0]) + ", " + new Date(timeSlice[1])); + + Double lastSeenValue = null; + int index = sliceNum - 1; + Long[] prevTimeSlice = null; + while (lastSeenValue == null && index >= 0) { + prevTimeSlice = timeSlices.get(index--); + lastSeenValue = timeSliceValueMap.get(prevTimeSlice[1]); + } + + Double nextSeenValue = null; + index = sliceNum + 1; + Long[] nextTimeSlice = null; + while (nextSeenValue == null && index < timeSlices.size()) { + nextTimeSlice = timeSlices.get(index++); + nextSeenValue = timeSliceValueMap.get(nextTimeSlice[1]); + } + + if (nextSeenValue == null) { + nextSeenValue = defaultNextSeenValue; + } + + Double interpolatedValue = PostProcessingUtil.interpolate(timeSlice[1], + (prevTimeSlice != null ? prevTimeSlice[1] : null), lastSeenValue, + (nextTimeSlice != null ? nextTimeSlice[1] : null), nextSeenValue); + + if (interpolatedValue != null) { + LOG.debug("Interpolated value : " + interpolatedValue); + resultClusterMetricMap.put(timeSlice[1], interpolatedValue); + } else { + LOG.debug("Cannot compute interpolated value, hence skipping."); + } + } + } + } + return resultClusterMetricMap; + } + + /** + * Return end of the time slice into which the metric fits. + */ + public static Long getSliceTimeForMetric(List<Long[]> timeSlices, Long timestamp) { + for (Long[] timeSlice : timeSlices) { + if (timestamp >= timeSlice[0] && timestamp < timeSlice[1]) { + return timeSlice[1]; + } + } + return -1l; + } + + /** + * Return time slices to normalize the timeseries data. + */ + public static List<Long[]> getTimeSlices(long startTime, long endTime, long timeSliceIntervalMillis) { + List<Long[]> timeSlices = new ArrayList<Long[]>(); + long sliceStartTime = startTime; + while (sliceStartTime < endTime) { + timeSlices.add(new Long[] { sliceStartTime, sliceStartTime + timeSliceIntervalMillis }); + sliceStartTime += timeSliceIntervalMillis; + } + return timeSlices; + } + + public static long getRoundedCheckPointTimeMillis(long referenceTime, long aggregatorPeriod) { + return referenceTime - (referenceTime % aggregatorPeriod); + } + + public static long getRoundedAggregateTimeMillis(long aggregatorPeriod) { + long currentTime = System.currentTimeMillis(); + return currentTime - (currentTime % aggregatorPeriod); + } +}
http://git-wip-us.apache.org/repos/asf/ambari/blob/42112e28/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/aggregators/CustomDownSampler.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/aggregators/CustomDownSampler.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/aggregators/CustomDownSampler.java new file mode 100644 index 0000000..49e2bf6 --- /dev/null +++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/aggregators/CustomDownSampler.java @@ -0,0 +1,44 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ambari.metrics.core.timeline.aggregators; + +import java.util.List; + +/** + * Interface to add a custom downsampler. + * Each configured downsampler will be executed during an aggregation cycle. + */ +public interface CustomDownSampler { + + /** + * Gatekeeper to check the configs. If this fails, the downsampling is not done. + * @return + */ + public boolean validateConfigs(); + + /** + * Return the set of statements that needs to be executed for the downsampling. + * @param startTime + * @param endTime + * @param tableName + * @return + */ + public List<String> prepareDownSamplingStatement(Long startTime, Long endTime, String tableName); + +} http://git-wip-us.apache.org/repos/asf/ambari/blob/42112e28/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/aggregators/DownSamplerUtils.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/aggregators/DownSamplerUtils.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/aggregators/DownSamplerUtils.java new file mode 100644 index 0000000..ad47931 --- /dev/null +++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/aggregators/DownSamplerUtils.java @@ -0,0 +1,117 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ambari.metrics.core.timeline.aggregators; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import org.apache.commons.lang.StringUtils; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; + +/** + * DownSampler utility class. Responsible for fetching downsampler configs from Metrics config, and determine if + * any downsamplers are configured. + */ + +public class DownSamplerUtils { + + public static final String downSamplerConfigPrefix = "timeline.metrics.downsampler."; + public static final String downSamplerMetricPatternsConfig = "metric.patterns"; + public static final String topNDownSampler = "topn"; + private static final Log LOG = LogFactory.getLog(DownSamplerUtils.class); + + + + /** + * Get the list of metrics that are requested to be downsampled. + * @param configuration + * @return List of metric patterns/names that are to be downsampled. + */ + public static List<String> getDownsampleMetricPatterns(Configuration configuration) { + Map<String, String> conf = configuration.getValByRegex(downSamplerConfigPrefix + "*"); + List<String> metricPatterns = new ArrayList<>(); + Set<String> keys = conf.keySet(); + for (String key : keys) { + if (key.endsWith(downSamplerMetricPatternsConfig)) { + String patternString = conf.get(key); + String[] patterns = StringUtils.split(patternString, ","); + for (String pattern : patterns) { + if (StringUtils.isNotEmpty(pattern)) { + String trimmedPattern = pattern.trim(); + metricPatterns.add(trimmedPattern); + } + } + } + } + return metricPatterns; + } + + /** + * Get the list of downsamplers that are configured in ams-site + * Sample config + <name>timeline.metrics.downsampler.topn.metric.patterns</name> + <value>dfs.NNTopUserOpCounts.windowMs=60000.op%,dfs.NNTopUserOpCounts.windowMs=300000.op%</value> + + <name>timeline.metrics.downsampler.topn.value</name> + <value>10</value> + + <name>timeline.metrics.downsampler.topn.function</name> + <value>max</value> + * @param configuration + * @return + */ + public static List<CustomDownSampler> getDownSamplers(Configuration configuration) { + + Map<String,String> conf = configuration.getValByRegex(downSamplerConfigPrefix + "*"); + List<CustomDownSampler> downSamplers = new ArrayList<>(); + Set<String> keys = conf.keySet(); + + try { + for (String key : keys) { + if (key.startsWith(downSamplerConfigPrefix) && key.endsWith(downSamplerMetricPatternsConfig)) { + String type = key.split("\\.")[3]; + CustomDownSampler downSampler = getDownSamplerByType(type, conf); + if (downSampler != null) { + downSamplers.add(downSampler); + } + } + } + } catch (Exception e) { + LOG.warn("Exception caught while parsing downsampler configs from ams-site : " + e.getMessage()); + } + return downSamplers; + } + + public static CustomDownSampler getDownSamplerByType(String type, Map<String, String> conf) { + if (type == null) { + return null; + } + + if (StringUtils.isNotEmpty(type) && type.equalsIgnoreCase(topNDownSampler)) { + return TopNDownSampler.fromConfig(conf); + } + + LOG.warn("Unknown downsampler requested : " + type); + return null; + } +} http://git-wip-us.apache.org/repos/asf/ambari/blob/42112e28/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/aggregators/Function.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/aggregators/Function.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/aggregators/Function.java new file mode 100644 index 0000000..dd67b64 --- /dev/null +++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/aggregators/Function.java @@ -0,0 +1,214 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ambari.metrics.core.timeline.aggregators; + +import org.apache.ambari.metrics.webapp.TimelineWebServices; + +import java.util.Arrays; + +/** + * Is used to determine metrics aggregate table. + * + * @see TimelineWebServices#getTimelineMetric + * @see TimelineWebServices#getTimelineMetrics + */ +public class Function { + public static Function DEFAULT_VALUE_FUNCTION = new Function(ReadFunction.VALUE, null); + private static final String SUFFIX_SEPARATOR = "\\._"; + + private ReadFunction readFunction = ReadFunction.VALUE; + private PostProcessingFunction postProcessingFunction = null; + + public Function() { + } + + public Function(ReadFunction readFunction, + PostProcessingFunction ppFunction){ + if (readFunction!=null){ + this.readFunction = readFunction ; + } + this.postProcessingFunction = ppFunction; + } + + /** + * Segregate post processing function eg: rate from aggregate function, + * example: avg, in any order + * @param metricName metric name from request + * @return @Function + */ + public static Function fromMetricName(String metricName) { + // gets postprocessing, and aggregation function + // ex. Metric._rate._avg + String[] parts = metricName.split(SUFFIX_SEPARATOR); + + ReadFunction readFunction = ReadFunction.VALUE; + PostProcessingFunction ppFunction = null; + + if (parts.length <= 1) { + return new Function(readFunction, null); + } + if (parts.length > 3) { + throw new IllegalArgumentException("Invalid number of functions specified."); + } + + // Parse functions + boolean isSuccessful = false; // Best effort + for (String part : parts) { + if (ReadFunction.isPresent(part)) { + readFunction = ReadFunction.getFunction(part); + isSuccessful = true; + } + if (PostProcessingFunction.isPresent(part)) { + ppFunction = PostProcessingFunction.getFunction(part); + isSuccessful = true; + } + } + + // Throw exception if parsing failed + if (!isSuccessful) { + throw new FunctionFormatException("Could not parse provided functions: " + + "" + Arrays.asList(parts)); + } + + return new Function(readFunction, ppFunction); + } + + public String getSuffix(){ + return (postProcessingFunction == null)? readFunction.getSuffix() : + postProcessingFunction.getSuffix() + readFunction.getSuffix(); + } + + public ReadFunction getReadFunction() { + return readFunction; + } + + @Override + public String toString() { + return "Function{" + + "readFunction=" + readFunction + + ", postProcessingFunction=" + postProcessingFunction + + '}'; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (!(o instanceof Function)) return false; + + Function function = (Function) o; + + return postProcessingFunction == function.postProcessingFunction + && readFunction == function.readFunction; + + } + + @Override + public int hashCode() { + int result = readFunction.hashCode(); + result = 31 * result + (postProcessingFunction != null ? + postProcessingFunction.hashCode() : 0); + return result; + } + + public enum PostProcessingFunction { + NONE(""), + RATE("._rate"), + DIFF("._diff"); + + PostProcessingFunction(String suffix){ + this.suffix = suffix; + } + + private String suffix = ""; + + public String getSuffix(){ + return suffix; + } + + public static boolean isPresent(String functionName) { + try { + PostProcessingFunction.valueOf(functionName.toUpperCase()); + } catch (IllegalArgumentException e) { + return false; + } + return true; + } + + public static PostProcessingFunction getFunction(String functionName) throws FunctionFormatException { + if (functionName == null) { + return NONE; + } + + try { + return PostProcessingFunction.valueOf(functionName.toUpperCase()); + } catch (IllegalArgumentException e) { + throw new FunctionFormatException("Function should be ._rate", e); + } + } + } + + public enum ReadFunction { + VALUE(""), + AVG("._avg"), + MIN("._min"), + MAX("._max"), + SUM("._sum"); + + private final String suffix; + + ReadFunction(String suffix){ + this.suffix = suffix; + } + + public String getSuffix() { + return suffix; + } + + public static boolean isPresent(String functionName) { + try { + ReadFunction.valueOf(functionName.toUpperCase()); + } catch (IllegalArgumentException e) { + return false; + } + return true; + } + + public static ReadFunction getFunction(String functionName) throws FunctionFormatException { + if (functionName == null) { + return VALUE; + } + try { + return ReadFunction.valueOf(functionName.toUpperCase()); + } catch (IllegalArgumentException e) { + throw new FunctionFormatException( + "Function should be sum, avg, min, max. Got " + functionName, e); + } + } + } + + public static class FunctionFormatException extends IllegalArgumentException { + public FunctionFormatException(String message) { + super(message); + } + + public FunctionFormatException(String message, Throwable cause) { + super(message, cause); + } + } + +} http://git-wip-us.apache.org/repos/asf/ambari/blob/42112e28/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/aggregators/TimelineClusterMetric.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/aggregators/TimelineClusterMetric.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/aggregators/TimelineClusterMetric.java new file mode 100644 index 0000000..6e81feb --- /dev/null +++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/aggregators/TimelineClusterMetric.java @@ -0,0 +1,97 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ambari.metrics.core.timeline.aggregators; + +public class TimelineClusterMetric { + private String metricName; + private String appId; + private String instanceId; + private long timestamp; + + public TimelineClusterMetric(String metricName, String appId, String instanceId, + long timestamp) { + this.metricName = metricName; + this.appId = appId; + this.instanceId = instanceId; + this.timestamp = timestamp; + } + + public String getMetricName() { + return metricName; + } + + public String getAppId() { + return appId; + } + + public String getInstanceId() { + return instanceId; + } + + public long getTimestamp() { + return timestamp; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + + TimelineClusterMetric that = (TimelineClusterMetric) o; + + if (timestamp != that.timestamp) return false; + if (appId != null ? !appId.equals(that.appId) : that.appId != null) + return false; + if (instanceId != null ? !instanceId.equals(that.instanceId) : that.instanceId != null) + return false; + if (!metricName.equals(that.metricName)) return false; + + return true; + } + + public boolean equalsExceptTime(TimelineClusterMetric metric) { + if (!metricName.equals(metric.metricName)) return false; + if (!appId.equals(metric.appId)) return false; + if (instanceId != null ? !instanceId.equals(metric.instanceId) : metric.instanceId != null) + return false; + + return true; + } + @Override + public int hashCode() { + int result = metricName.hashCode(); + result = 31 * result + (appId != null ? appId.hashCode() : 0); + result = 31 * result + (instanceId != null ? instanceId.hashCode() : 0); + result = 31 * result + (int) (timestamp ^ (timestamp >>> 32)); + return result; + } + + @Override + public String toString() { + return "TimelineClusterMetric{" + + "metricName='" + metricName + '\'' + + ", appId='" + appId + '\'' + + ", instanceId='" + instanceId + '\'' + + ", timestamp=" + timestamp + + '}'; + } + + public void setTimestamp(long timestamp) { + this.timestamp = timestamp; + } +} http://git-wip-us.apache.org/repos/asf/ambari/blob/42112e28/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/aggregators/TimelineMetricAggregator.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/aggregators/TimelineMetricAggregator.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/aggregators/TimelineMetricAggregator.java new file mode 100644 index 0000000..3698f1b --- /dev/null +++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/aggregators/TimelineMetricAggregator.java @@ -0,0 +1,59 @@ +package org.apache.ambari.metrics.core.timeline.aggregators; + +import org.apache.ambari.metrics.core.timeline.availability.AggregationTaskRunner.AGGREGATOR_NAME; + +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +public interface TimelineMetricAggregator extends Runnable { + /** + * Aggregate metric data within the time bounds. + * + * @param startTime start time millis + * @param endTime end time millis + * @return success + */ + boolean doWork(long startTime, long endTime); + + /** + * Is aggregator is disabled by configuration. + * + * @return true/false + */ + boolean isDisabled(); + + /** + * Return aggregator Interval + * + * @return Interval in Millis + */ + Long getSleepIntervalMillis(); + + /** + * Get aggregator name + * @return @AGGREGATOR_NAME + */ + AGGREGATOR_NAME getName(); + + /** + * Known aggregator types + */ + enum AGGREGATOR_TYPE { + CLUSTER, + HOST + } +} http://git-wip-us.apache.org/repos/asf/ambari/blob/42112e28/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/aggregators/TimelineMetricAggregatorFactory.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/aggregators/TimelineMetricAggregatorFactory.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/aggregators/TimelineMetricAggregatorFactory.java new file mode 100644 index 0000000..b395b39 --- /dev/null +++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/aggregators/TimelineMetricAggregatorFactory.java @@ -0,0 +1,528 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ambari.metrics.core.timeline.aggregators; + +import static java.util.concurrent.TimeUnit.SECONDS; +import static org.apache.ambari.metrics.core.timeline.TimelineMetricConfiguration.CLUSTER_AGGREGATOR_DAILY_CHECKPOINT_CUTOFF_MULTIPLIER; +import static org.apache.ambari.metrics.core.timeline.TimelineMetricConfiguration.CLUSTER_AGGREGATOR_DAILY_DISABLED; +import static org.apache.ambari.metrics.core.timeline.TimelineMetricConfiguration.CLUSTER_AGGREGATOR_DAILY_SLEEP_INTERVAL; +import static org.apache.ambari.metrics.core.timeline.TimelineMetricConfiguration.CLUSTER_AGGREGATOR_HOUR_CHECKPOINT_CUTOFF_MULTIPLIER; +import static org.apache.ambari.metrics.core.timeline.TimelineMetricConfiguration.CLUSTER_AGGREGATOR_HOUR_DISABLED; +import static org.apache.ambari.metrics.core.timeline.TimelineMetricConfiguration.CLUSTER_AGGREGATOR_HOUR_SLEEP_INTERVAL; +import static org.apache.ambari.metrics.core.timeline.TimelineMetricConfiguration.CLUSTER_AGGREGATOR_MINUTE_CHECKPOINT_CUTOFF_MULTIPLIER; +import static org.apache.ambari.metrics.core.timeline.TimelineMetricConfiguration.CLUSTER_AGGREGATOR_MINUTE_DISABLED; +import static org.apache.ambari.metrics.core.timeline.TimelineMetricConfiguration.CLUSTER_AGGREGATOR_MINUTE_SLEEP_INTERVAL; +import static org.apache.ambari.metrics.core.timeline.TimelineMetricConfiguration.CLUSTER_AGGREGATOR_SECOND_CHECKPOINT_CUTOFF_MULTIPLIER; +import static org.apache.ambari.metrics.core.timeline.TimelineMetricConfiguration.CLUSTER_AGGREGATOR_SECOND_DISABLED; +import static org.apache.ambari.metrics.core.timeline.TimelineMetricConfiguration.CLUSTER_AGGREGATOR_SECOND_SLEEP_INTERVAL; +import static org.apache.ambari.metrics.core.timeline.TimelineMetricConfiguration.CLUSTER_AGGREGATOR_TIMESLICE_INTERVAL; +import static org.apache.ambari.metrics.core.timeline.TimelineMetricConfiguration.DEFAULT_CHECKPOINT_LOCATION; +import static org.apache.ambari.metrics.core.timeline.TimelineMetricConfiguration.HOST_AGGREGATOR_DAILY_CHECKPOINT_CUTOFF_MULTIPLIER; +import static org.apache.ambari.metrics.core.timeline.TimelineMetricConfiguration.HOST_AGGREGATOR_DAILY_DISABLED; +import static org.apache.ambari.metrics.core.timeline.TimelineMetricConfiguration.HOST_AGGREGATOR_DAILY_SLEEP_INTERVAL; +import static org.apache.ambari.metrics.core.timeline.TimelineMetricConfiguration.HOST_AGGREGATOR_HOUR_CHECKPOINT_CUTOFF_MULTIPLIER; +import static org.apache.ambari.metrics.core.timeline.TimelineMetricConfiguration.HOST_AGGREGATOR_HOUR_DISABLED; +import static org.apache.ambari.metrics.core.timeline.TimelineMetricConfiguration.HOST_AGGREGATOR_HOUR_SLEEP_INTERVAL; +import static org.apache.ambari.metrics.core.timeline.TimelineMetricConfiguration.HOST_AGGREGATOR_MINUTE_CHECKPOINT_CUTOFF_MULTIPLIER; +import static org.apache.ambari.metrics.core.timeline.TimelineMetricConfiguration.HOST_AGGREGATOR_MINUTE_DISABLED; +import static org.apache.ambari.metrics.core.timeline.TimelineMetricConfiguration.HOST_AGGREGATOR_MINUTE_SLEEP_INTERVAL; +import static org.apache.ambari.metrics.core.timeline.TimelineMetricConfiguration.TIMELINE_METRICS_AGGREGATOR_CHECKPOINT_DIR; +import static org.apache.ambari.metrics.core.timeline.TimelineMetricConfiguration.USE_GROUPBY_AGGREGATOR_QUERIES; +import static org.apache.ambari.metrics.core.timeline.availability.AggregationTaskRunner.AGGREGATOR_NAME.METRIC_AGGREGATE_DAILY; +import static org.apache.ambari.metrics.core.timeline.availability.AggregationTaskRunner.AGGREGATOR_NAME.METRIC_AGGREGATE_HOURLY; +import static org.apache.ambari.metrics.core.timeline.availability.AggregationTaskRunner.AGGREGATOR_NAME.METRIC_AGGREGATE_MINUTE; +import static org.apache.ambari.metrics.core.timeline.availability.AggregationTaskRunner.AGGREGATOR_NAME.METRIC_AGGREGATE_SECOND; +import static org.apache.ambari.metrics.core.timeline.availability.AggregationTaskRunner.AGGREGATOR_NAME.METRIC_RECORD_DAILY; +import static org.apache.ambari.metrics.core.timeline.availability.AggregationTaskRunner.AGGREGATOR_NAME.METRIC_RECORD_HOURLY; +import static org.apache.ambari.metrics.core.timeline.availability.AggregationTaskRunner.AGGREGATOR_NAME.METRIC_RECORD_MINUTE; +import static org.apache.ambari.metrics.core.timeline.query.PhoenixTransactSQL.METRICS_AGGREGATE_DAILY_TABLE_NAME; +import static org.apache.ambari.metrics.core.timeline.query.PhoenixTransactSQL.METRICS_AGGREGATE_HOURLY_TABLE_NAME; +import static org.apache.ambari.metrics.core.timeline.query.PhoenixTransactSQL.METRICS_AGGREGATE_MINUTE_TABLE_NAME; +import static org.apache.ambari.metrics.core.timeline.query.PhoenixTransactSQL.METRICS_CLUSTER_AGGREGATE_DAILY_TABLE_NAME; +import static org.apache.ambari.metrics.core.timeline.query.PhoenixTransactSQL.METRICS_CLUSTER_AGGREGATE_HOURLY_TABLE_NAME; +import static org.apache.ambari.metrics.core.timeline.query.PhoenixTransactSQL.METRICS_CLUSTER_AGGREGATE_MINUTE_TABLE_NAME; +import static org.apache.ambari.metrics.core.timeline.query.PhoenixTransactSQL.METRICS_CLUSTER_AGGREGATE_TABLE_NAME; +import static org.apache.ambari.metrics.core.timeline.query.PhoenixTransactSQL.METRICS_RECORD_TABLE_NAME; + +import java.util.concurrent.ConcurrentHashMap; + +import org.apache.ambari.metrics.core.timeline.availability.MetricCollectorHAController; +import org.apache.commons.io.FilenameUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.ambari.metrics.core.timeline.PhoenixHBaseAccessor; +import org.apache.ambari.metrics.core.timeline.TimelineMetricConfiguration; +import org.apache.ambari.metrics.core.timeline.TimelineMetricDistributedCache; +import org.apache.ambari.metrics.core.timeline.discovery.TimelineMetricMetadataManager; + +/** + * Factory class that knows how to create a aggregator instance using + * TimelineMetricConfiguration + */ +public class TimelineMetricAggregatorFactory { + private static final String HOST_AGGREGATE_MINUTE_CHECKPOINT_FILE = + "timeline-metrics-host-aggregator-checkpoint"; + private static final String HOST_AGGREGATE_HOURLY_CHECKPOINT_FILE = + "timeline-metrics-host-aggregator-hourly-checkpoint"; + private static final String HOST_AGGREGATE_DAILY_CHECKPOINT_FILE = + "timeline-metrics-host-aggregator-daily-checkpoint"; + private static final String CLUSTER_AGGREGATOR_CHECKPOINT_FILE = + "timeline-metrics-cluster-aggregator-checkpoint"; + private static final String CLUSTER_AGGREGATOR_MINUTE_CHECKPOINT_FILE = + "timeline-metrics-cluster-aggregator-minute-checkpoint"; + private static final String CLUSTER_AGGREGATOR_HOURLY_CHECKPOINT_FILE = + "timeline-metrics-cluster-aggregator-hourly-checkpoint"; + private static final String CLUSTER_AGGREGATOR_DAILY_CHECKPOINT_FILE = + "timeline-metrics-cluster-aggregator-daily-checkpoint"; + + private static boolean useGroupByAggregator(Configuration metricsConf) { + return Boolean.parseBoolean(metricsConf.get(USE_GROUPBY_AGGREGATOR_QUERIES, "true")); + } + + /** + * Minute based aggregation for hosts. + * Interval : 5 mins + */ + public static TimelineMetricAggregator createTimelineMetricAggregatorMinute + (PhoenixHBaseAccessor hBaseAccessor, Configuration metricsConf, + TimelineMetricMetadataManager metadataManager, + MetricCollectorHAController haController) { + + String checkpointDir = metricsConf.get( + TIMELINE_METRICS_AGGREGATOR_CHECKPOINT_DIR, DEFAULT_CHECKPOINT_LOCATION); + String checkpointLocation = FilenameUtils.concat(checkpointDir, + HOST_AGGREGATE_MINUTE_CHECKPOINT_FILE); + long sleepIntervalMillis = SECONDS.toMillis(metricsConf.getLong + (HOST_AGGREGATOR_MINUTE_SLEEP_INTERVAL, 300l)); // 5 mins + + int checkpointCutOffMultiplier = metricsConf.getInt + (HOST_AGGREGATOR_MINUTE_CHECKPOINT_CUTOFF_MULTIPLIER, 3); + String hostAggregatorDisabledParam = HOST_AGGREGATOR_MINUTE_DISABLED; + + String inputTableName = METRICS_RECORD_TABLE_NAME; + String outputTableName = METRICS_AGGREGATE_MINUTE_TABLE_NAME; + + if (useGroupByAggregator(metricsConf)) { + return new org.apache.ambari.metrics.core.timeline.aggregators.v2.TimelineMetricHostAggregator( + METRIC_RECORD_MINUTE, + hBaseAccessor, metricsConf, + checkpointLocation, + sleepIntervalMillis, + checkpointCutOffMultiplier, + hostAggregatorDisabledParam, + inputTableName, + outputTableName, + 120000l, + haController + ); + } + + return new TimelineMetricHostAggregator( + METRIC_RECORD_MINUTE, + metadataManager, + hBaseAccessor, metricsConf, + checkpointLocation, + sleepIntervalMillis, + checkpointCutOffMultiplier, + hostAggregatorDisabledParam, + inputTableName, + outputTableName, + 120000l, + haController); + } + + /** + * Hourly aggregation for hosts. + * Interval : 1 hour + */ + public static TimelineMetricAggregator createTimelineMetricAggregatorHourly + (PhoenixHBaseAccessor hBaseAccessor, Configuration metricsConf, + TimelineMetricMetadataManager metadataManager, + MetricCollectorHAController haController) { + + String checkpointDir = metricsConf.get( + TIMELINE_METRICS_AGGREGATOR_CHECKPOINT_DIR, DEFAULT_CHECKPOINT_LOCATION); + String checkpointLocation = FilenameUtils.concat(checkpointDir, + HOST_AGGREGATE_HOURLY_CHECKPOINT_FILE); + long sleepIntervalMillis = SECONDS.toMillis(metricsConf.getLong + (HOST_AGGREGATOR_HOUR_SLEEP_INTERVAL, 3600l)); + + int checkpointCutOffMultiplier = metricsConf.getInt + (HOST_AGGREGATOR_HOUR_CHECKPOINT_CUTOFF_MULTIPLIER, 2); + String hostAggregatorDisabledParam = HOST_AGGREGATOR_HOUR_DISABLED; + + String inputTableName = METRICS_AGGREGATE_MINUTE_TABLE_NAME; + String outputTableName = METRICS_AGGREGATE_HOURLY_TABLE_NAME; + + if (useGroupByAggregator(metricsConf)) { + return new org.apache.ambari.metrics.core.timeline.aggregators.v2.TimelineMetricHostAggregator( + METRIC_RECORD_HOURLY, + hBaseAccessor, metricsConf, + checkpointLocation, + sleepIntervalMillis, + checkpointCutOffMultiplier, + hostAggregatorDisabledParam, + inputTableName, + outputTableName, + 3600000l, + haController + ); + } + + return new TimelineMetricHostAggregator( + METRIC_RECORD_HOURLY, + metadataManager, + hBaseAccessor, metricsConf, + checkpointLocation, + sleepIntervalMillis, + checkpointCutOffMultiplier, + hostAggregatorDisabledParam, + inputTableName, + outputTableName, + 3600000l, + haController); + } + + /** + * Daily aggregation for hosts. + * Interval : 1 day + */ + public static TimelineMetricAggregator createTimelineMetricAggregatorDaily + (PhoenixHBaseAccessor hBaseAccessor, Configuration metricsConf, + TimelineMetricMetadataManager metadataManager, + MetricCollectorHAController haController) { + + String checkpointDir = metricsConf.get( + TIMELINE_METRICS_AGGREGATOR_CHECKPOINT_DIR, DEFAULT_CHECKPOINT_LOCATION); + String checkpointLocation = FilenameUtils.concat(checkpointDir, + HOST_AGGREGATE_DAILY_CHECKPOINT_FILE); + long sleepIntervalMillis = SECONDS.toMillis(metricsConf.getLong + (HOST_AGGREGATOR_DAILY_SLEEP_INTERVAL, 86400l)); + + int checkpointCutOffMultiplier = metricsConf.getInt + (HOST_AGGREGATOR_DAILY_CHECKPOINT_CUTOFF_MULTIPLIER, 1); + String hostAggregatorDisabledParam = HOST_AGGREGATOR_DAILY_DISABLED; + + String inputTableName = METRICS_AGGREGATE_HOURLY_TABLE_NAME; + String outputTableName = METRICS_AGGREGATE_DAILY_TABLE_NAME; + + if (useGroupByAggregator(metricsConf)) { + return new org.apache.ambari.metrics.core.timeline.aggregators.v2.TimelineMetricHostAggregator( + METRIC_RECORD_DAILY, + hBaseAccessor, metricsConf, + checkpointLocation, + sleepIntervalMillis, + checkpointCutOffMultiplier, + hostAggregatorDisabledParam, + inputTableName, + outputTableName, + 3600000l, + haController + ); + } + + return new TimelineMetricHostAggregator( + METRIC_RECORD_DAILY, + metadataManager, + hBaseAccessor, metricsConf, + checkpointLocation, + sleepIntervalMillis, + checkpointCutOffMultiplier, + hostAggregatorDisabledParam, + inputTableName, + outputTableName, + 3600000l, + haController); + } + + /** + * Second aggregation for cluster. + * Interval : 2 mins + * Timeslice : 30 sec + */ + public static TimelineMetricAggregator createTimelineClusterAggregatorSecond( + PhoenixHBaseAccessor hBaseAccessor, Configuration metricsConf, + TimelineMetricMetadataManager metadataManager, + MetricCollectorHAController haController, + TimelineMetricDistributedCache distributedCache) { + + String checkpointDir = metricsConf.get( + TIMELINE_METRICS_AGGREGATOR_CHECKPOINT_DIR, DEFAULT_CHECKPOINT_LOCATION); + + String checkpointLocation = FilenameUtils.concat(checkpointDir, + CLUSTER_AGGREGATOR_CHECKPOINT_FILE); + + long sleepIntervalMillis = SECONDS.toMillis(metricsConf.getLong + (CLUSTER_AGGREGATOR_SECOND_SLEEP_INTERVAL, 120l)); + + long timeSliceIntervalMillis = SECONDS.toMillis(metricsConf.getInt + (CLUSTER_AGGREGATOR_TIMESLICE_INTERVAL, 30)); + + int checkpointCutOffMultiplier = + metricsConf.getInt(CLUSTER_AGGREGATOR_SECOND_CHECKPOINT_CUTOFF_MULTIPLIER, 2); + + String outputTableName = METRICS_CLUSTER_AGGREGATE_TABLE_NAME; + String aggregatorDisabledParam = CLUSTER_AGGREGATOR_SECOND_DISABLED; + + // Second based aggregation have added responsibility of time slicing + if (TimelineMetricConfiguration.getInstance().isCollectorInMemoryAggregationEnabled()) { + return new TimelineMetricClusterAggregatorSecondWithCacheSource( + METRIC_AGGREGATE_SECOND, + metadataManager, + hBaseAccessor, metricsConf, + checkpointLocation, + sleepIntervalMillis, + checkpointCutOffMultiplier, + aggregatorDisabledParam, + null, + outputTableName, + 120000l, + timeSliceIntervalMillis, + haController, + distributedCache + ); + } + + String inputTableName = METRICS_RECORD_TABLE_NAME; + return new TimelineMetricClusterAggregatorSecond( + METRIC_AGGREGATE_SECOND, + metadataManager, + hBaseAccessor, metricsConf, + checkpointLocation, + sleepIntervalMillis, + checkpointCutOffMultiplier, + aggregatorDisabledParam, + inputTableName, + outputTableName, + 120000l, + timeSliceIntervalMillis, + haController + ); + } + + /** + * Minute aggregation for cluster. + * Interval : 5 mins + */ + public static TimelineMetricAggregator createTimelineClusterAggregatorMinute( + PhoenixHBaseAccessor hBaseAccessor, Configuration metricsConf, + TimelineMetricMetadataManager metadataManager, + MetricCollectorHAController haController) { + + String checkpointDir = metricsConf.get( + TIMELINE_METRICS_AGGREGATOR_CHECKPOINT_DIR, DEFAULT_CHECKPOINT_LOCATION); + + String checkpointLocation = FilenameUtils.concat(checkpointDir, + CLUSTER_AGGREGATOR_MINUTE_CHECKPOINT_FILE); + + long sleepIntervalMillis = SECONDS.toMillis(metricsConf.getLong + (CLUSTER_AGGREGATOR_MINUTE_SLEEP_INTERVAL, 300l)); + + int checkpointCutOffMultiplier = metricsConf.getInt + (CLUSTER_AGGREGATOR_MINUTE_CHECKPOINT_CUTOFF_MULTIPLIER, 2); + + String inputTableName = METRICS_CLUSTER_AGGREGATE_TABLE_NAME; + String outputTableName = METRICS_CLUSTER_AGGREGATE_MINUTE_TABLE_NAME; + String aggregatorDisabledParam = CLUSTER_AGGREGATOR_MINUTE_DISABLED; + + if (useGroupByAggregator(metricsConf)) { + return new org.apache.ambari.metrics.core.timeline.aggregators.v2.TimelineMetricClusterAggregator( + METRIC_AGGREGATE_MINUTE, + hBaseAccessor, metricsConf, + checkpointLocation, + sleepIntervalMillis, + checkpointCutOffMultiplier, + aggregatorDisabledParam, + inputTableName, + outputTableName, + 120000l, + haController + ); + } + + return new TimelineMetricClusterAggregator( + METRIC_AGGREGATE_MINUTE, + metadataManager, + hBaseAccessor, metricsConf, + checkpointLocation, + sleepIntervalMillis, + checkpointCutOffMultiplier, + aggregatorDisabledParam, + inputTableName, + outputTableName, + 120000l, + haController + ); + } + + /** + * Hourly aggregation for cluster. + * Interval : 1 hour + */ + public static TimelineMetricAggregator createTimelineClusterAggregatorHourly( + PhoenixHBaseAccessor hBaseAccessor, Configuration metricsConf, + TimelineMetricMetadataManager metadataManager, + MetricCollectorHAController haController) { + + String checkpointDir = metricsConf.get( + TIMELINE_METRICS_AGGREGATOR_CHECKPOINT_DIR, DEFAULT_CHECKPOINT_LOCATION); + + String checkpointLocation = FilenameUtils.concat(checkpointDir, + CLUSTER_AGGREGATOR_HOURLY_CHECKPOINT_FILE); + + long sleepIntervalMillis = SECONDS.toMillis(metricsConf.getLong + (CLUSTER_AGGREGATOR_HOUR_SLEEP_INTERVAL, 3600l)); + + int checkpointCutOffMultiplier = metricsConf.getInt + (CLUSTER_AGGREGATOR_HOUR_CHECKPOINT_CUTOFF_MULTIPLIER, 2); + + String inputTableName = METRICS_CLUSTER_AGGREGATE_TABLE_NAME; + String outputTableName = METRICS_CLUSTER_AGGREGATE_HOURLY_TABLE_NAME; + String aggregatorDisabledParam = CLUSTER_AGGREGATOR_HOUR_DISABLED; + + if (useGroupByAggregator(metricsConf)) { + return new org.apache.ambari.metrics.core.timeline.aggregators.v2.TimelineMetricClusterAggregator( + METRIC_AGGREGATE_HOURLY, + hBaseAccessor, metricsConf, + checkpointLocation, + sleepIntervalMillis, + checkpointCutOffMultiplier, + aggregatorDisabledParam, + inputTableName, + outputTableName, + 120000l, + haController + ); + } + + return new TimelineMetricClusterAggregator( + METRIC_AGGREGATE_HOURLY, + metadataManager, + hBaseAccessor, metricsConf, + checkpointLocation, + sleepIntervalMillis, + checkpointCutOffMultiplier, + aggregatorDisabledParam, + inputTableName, + outputTableName, + 120000l, + haController + ); + } + + /** + * Daily aggregation for cluster. + * Interval : 1 day + */ + public static TimelineMetricAggregator createTimelineClusterAggregatorDaily( + PhoenixHBaseAccessor hBaseAccessor, Configuration metricsConf, + TimelineMetricMetadataManager metadataManager, + MetricCollectorHAController haController) { + + String checkpointDir = metricsConf.get( + TIMELINE_METRICS_AGGREGATOR_CHECKPOINT_DIR, DEFAULT_CHECKPOINT_LOCATION); + + String checkpointLocation = FilenameUtils.concat(checkpointDir, + CLUSTER_AGGREGATOR_DAILY_CHECKPOINT_FILE); + + long sleepIntervalMillis = SECONDS.toMillis(metricsConf.getLong + (CLUSTER_AGGREGATOR_DAILY_SLEEP_INTERVAL, 86400l)); + + int checkpointCutOffMultiplier = metricsConf.getInt + (CLUSTER_AGGREGATOR_DAILY_CHECKPOINT_CUTOFF_MULTIPLIER, 1); + + String inputTableName = METRICS_CLUSTER_AGGREGATE_HOURLY_TABLE_NAME; + String outputTableName = METRICS_CLUSTER_AGGREGATE_DAILY_TABLE_NAME; + String aggregatorDisabledParam = CLUSTER_AGGREGATOR_DAILY_DISABLED; + + if (useGroupByAggregator(metricsConf)) { + return new org.apache.ambari.metrics.core.timeline.aggregators.v2.TimelineMetricClusterAggregator( + METRIC_AGGREGATE_DAILY, + hBaseAccessor, metricsConf, + checkpointLocation, + sleepIntervalMillis, + checkpointCutOffMultiplier, + aggregatorDisabledParam, + inputTableName, + outputTableName, + 120000l, + haController + ); + } + + return new TimelineMetricClusterAggregator( + METRIC_AGGREGATE_DAILY, + metadataManager, + hBaseAccessor, metricsConf, + checkpointLocation, + sleepIntervalMillis, + checkpointCutOffMultiplier, + aggregatorDisabledParam, + inputTableName, + outputTableName, + 120000l, + haController + ); + } + + public static TimelineMetricAggregator createFilteringTimelineMetricAggregatorMinute(PhoenixHBaseAccessor hBaseAccessor, Configuration metricsConf, TimelineMetricMetadataManager metricMetadataManager, MetricCollectorHAController haController, ConcurrentHashMap<String, Long> postedAggregatedMap) { + String checkpointDir = metricsConf.get( + TIMELINE_METRICS_AGGREGATOR_CHECKPOINT_DIR, DEFAULT_CHECKPOINT_LOCATION); + String checkpointLocation = FilenameUtils.concat(checkpointDir, + HOST_AGGREGATE_MINUTE_CHECKPOINT_FILE); + long sleepIntervalMillis = SECONDS.toMillis(metricsConf.getLong + (HOST_AGGREGATOR_MINUTE_SLEEP_INTERVAL, 300l)); // 5 mins + + int checkpointCutOffMultiplier = metricsConf.getInt + (HOST_AGGREGATOR_MINUTE_CHECKPOINT_CUTOFF_MULTIPLIER, 3); + String hostAggregatorDisabledParam = HOST_AGGREGATOR_MINUTE_DISABLED; + + String inputTableName = METRICS_RECORD_TABLE_NAME; + String outputTableName = METRICS_AGGREGATE_MINUTE_TABLE_NAME; + + if (useGroupByAggregator(metricsConf)) { + return new org.apache.ambari.metrics.core.timeline.aggregators.v2.TimelineMetricFilteringHostAggregator( + METRIC_RECORD_MINUTE, + metricMetadataManager, + hBaseAccessor, metricsConf, + checkpointLocation, + sleepIntervalMillis, + checkpointCutOffMultiplier, + hostAggregatorDisabledParam, + inputTableName, + outputTableName, + 120000l, + haController, + postedAggregatedMap + ); + } + + return new TimelineMetricFilteringHostAggregator( + METRIC_RECORD_MINUTE, + metricMetadataManager, + hBaseAccessor, metricsConf, + checkpointLocation, + sleepIntervalMillis, + checkpointCutOffMultiplier, + hostAggregatorDisabledParam, + inputTableName, + outputTableName, + 120000l, + haController, + postedAggregatedMap); + } +} http://git-wip-us.apache.org/repos/asf/ambari/blob/42112e28/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/aggregators/TimelineMetricAppAggregator.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/aggregators/TimelineMetricAppAggregator.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/aggregators/TimelineMetricAppAggregator.java new file mode 100644 index 0000000..190ad9a --- /dev/null +++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/aggregators/TimelineMetricAppAggregator.java @@ -0,0 +1,187 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ambari.metrics.core.timeline.aggregators; + +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +import org.apache.ambari.metrics.core.timeline.TimelineMetricConfiguration; +import org.apache.ambari.metrics.core.timeline.TimelineMetricsFilter; +import org.apache.ambari.metrics.core.timeline.discovery.TimelineMetricHostMetadata; +import org.apache.ambari.metrics.core.timeline.discovery.TimelineMetricMetadataKey; +import org.apache.ambari.metrics.core.timeline.discovery.TimelineMetricMetadataManager; +import org.apache.commons.lang.StringUtils; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.metrics2.sink.timeline.MetricClusterAggregate; +import org.apache.hadoop.metrics2.sink.timeline.TimelineMetricMetadata; + +/** + * Aggregator responsible for providing app level host aggregates. This task + * is accomplished without doing a round trip to storage, rather + * TimelineMetricClusterAggregators are responsible for lifecycle of + * @TimelineMetricAppAggregator and provide the raw data to aggregate. + */ +public class TimelineMetricAppAggregator { + private static final Log LOG = LogFactory.getLog(TimelineMetricAppAggregator.class); + // Lookup to check candidacy of an app + private final List<String> appIdsToAggregate; + private final Map<String, TimelineMetricHostMetadata> hostMetadata; + Map<TimelineClusterMetric, MetricClusterAggregate> aggregateClusterMetrics = new HashMap<>(); + TimelineMetricMetadataManager metadataManagerInstance; + + public TimelineMetricAppAggregator(TimelineMetricMetadataManager metadataManager, + Configuration metricsConf) { + appIdsToAggregate = getAppIdsForHostAggregation(metricsConf); + hostMetadata = metadataManager.getHostedAppsCache(); + metadataManagerInstance = metadataManager; + LOG.info("AppIds configured for aggregation: " + appIdsToAggregate); + } + + /** + * Lifecycle method to initialize aggregation cycle. + */ + public void init() { + LOG.debug("Initializing aggregation cycle."); + aggregateClusterMetrics = new HashMap<>(); + } + + /** + * Lifecycle method to indicate end of aggregation cycle. + */ + public void cleanup() { + LOG.debug("Cleanup aggregated data."); + aggregateClusterMetrics = null; + } + + /** + * Calculate aggregates if the clusterMetric is a Host metric for recorded + * apps that are housed by this host. + * + * @param clusterMetric @TimelineClusterMetric Host / App metric + * @param hostname This is the hostname from which this clusterMetric originated. + * @param metricValue The metric value for this metric. + */ + public void processTimelineClusterMetric(TimelineClusterMetric clusterMetric, + String hostname, Double metricValue) { + + String appId = clusterMetric.getAppId(); + if (appId == null) { + return; // No real use case except tests + } + + // If metric is a host metric and host has apps on it + if (appId.equalsIgnoreCase(TimelineMetricConfiguration.HOST_APP_ID)) { + // Candidate metric, update app aggregates + if (hostMetadata.containsKey(hostname)) { + updateAppAggregatesFromHostMetric(clusterMetric, hostname, metricValue); + } + } else { + // Build the hostedapps map if not a host metric + // Check app candidacy for host aggregation + if (appIdsToAggregate.contains(appId)) { + TimelineMetricHostMetadata timelineMetricHostMetadata = hostMetadata.get(hostname); + ConcurrentHashMap<String, String> appIds; + if (timelineMetricHostMetadata == null) { + appIds = new ConcurrentHashMap<>(); + hostMetadata.put(hostname, new TimelineMetricHostMetadata(appIds)); + } else { + appIds = timelineMetricHostMetadata.getHostedApps(); + } + if (!appIds.containsKey(appId)) { + appIds.put(appId, appId); + LOG.info("Adding appId to hosted apps: appId = " + + clusterMetric.getAppId() + ", hostname = " + hostname); + } + } + } + } + + /** + * Build a cluster app metric from a host metric + */ + private void updateAppAggregatesFromHostMetric(TimelineClusterMetric clusterMetric, + String hostname, Double metricValue) { + + if (aggregateClusterMetrics == null) { + LOG.error("Aggregation requested without init call."); + return; + } + + TimelineMetricMetadataKey appKey = new TimelineMetricMetadataKey(clusterMetric.getMetricName(), TimelineMetricConfiguration.HOST_APP_ID, clusterMetric.getInstanceId()); + ConcurrentHashMap<String, String> apps = hostMetadata.get(hostname).getHostedApps(); + for (String appId : apps.keySet()) { + if (appIdsToAggregate.contains(appId)) { + + appKey.setAppId(appId); + TimelineMetricMetadata appMetadata = metadataManagerInstance.getMetadataCacheValue(appKey); + if (appMetadata == null) { + TimelineMetricMetadataKey key = new TimelineMetricMetadataKey(clusterMetric.getMetricName(), TimelineMetricConfiguration.HOST_APP_ID, clusterMetric.getInstanceId()); + TimelineMetricMetadata hostMetricMetadata = metadataManagerInstance.getMetadataCacheValue(key); + + if (hostMetricMetadata != null) { + TimelineMetricMetadata timelineMetricMetadata = new TimelineMetricMetadata(clusterMetric.getMetricName(), + appId, clusterMetric.getInstanceId(), hostMetricMetadata.getUnits(), hostMetricMetadata.getType(), hostMetricMetadata.getSeriesStartTime(), + hostMetricMetadata.isSupportsAggregates(), TimelineMetricsFilter.acceptMetric(clusterMetric.getMetricName(), appId)); + metadataManagerInstance.putIfModifiedTimelineMetricMetadata(timelineMetricMetadata); + } + } + + // Add a new cluster aggregate metric if none exists + TimelineClusterMetric appTimelineClusterMetric = + new TimelineClusterMetric(clusterMetric.getMetricName(), + appId, + clusterMetric.getInstanceId(), + clusterMetric.getTimestamp()); + + MetricClusterAggregate clusterAggregate = aggregateClusterMetrics.get(appTimelineClusterMetric); + + if (clusterAggregate == null) { + clusterAggregate = new MetricClusterAggregate(metricValue, 1, null, metricValue, metricValue); + aggregateClusterMetrics.put(appTimelineClusterMetric, clusterAggregate); + } else { + clusterAggregate.updateSum(metricValue); + clusterAggregate.updateNumberOfHosts(1); + clusterAggregate.updateMax(metricValue); + clusterAggregate.updateMin(metricValue); + } + } + + } + } + + /** + * Return current copy of aggregated data. + */ + public Map<TimelineClusterMetric, MetricClusterAggregate> getAggregateClusterMetrics() { + return aggregateClusterMetrics; + } + + private List<String> getAppIdsForHostAggregation(Configuration metricsConf) { + String appIds = metricsConf.get(TimelineMetricConfiguration.CLUSTER_AGGREGATOR_APP_IDS); + if (!StringUtils.isEmpty(appIds)) { + return Arrays.asList(StringUtils.stripAll(appIds.split(","))); + } + return Collections.emptyList(); + } +} http://git-wip-us.apache.org/repos/asf/ambari/blob/42112e28/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/aggregators/TimelineMetricClusterAggregator.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/aggregators/TimelineMetricClusterAggregator.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/aggregators/TimelineMetricClusterAggregator.java new file mode 100644 index 0000000..2ea5309 --- /dev/null +++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/aggregators/TimelineMetricClusterAggregator.java @@ -0,0 +1,150 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ambari.metrics.core.timeline.aggregators; + +import static org.apache.ambari.metrics.core.timeline.query.PhoenixTransactSQL.GET_CLUSTER_AGGREGATE_SQL; +import static org.apache.ambari.metrics.core.timeline.query.PhoenixTransactSQL.GET_CLUSTER_AGGREGATE_TIME_SQL; +import static org.apache.ambari.metrics.core.timeline.query.PhoenixTransactSQL.METRICS_CLUSTER_AGGREGATE_TABLE_NAME; + +import java.io.IOException; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.util.HashMap; +import java.util.Map; + +import org.apache.ambari.metrics.core.timeline.availability.MetricCollectorHAController; +import org.apache.ambari.metrics.core.timeline.query.DefaultCondition; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.metrics2.sink.timeline.MetricClusterAggregate; +import org.apache.hadoop.metrics2.sink.timeline.MetricHostAggregate; +import org.apache.ambari.metrics.core.timeline.PhoenixHBaseAccessor; +import org.apache.ambari.metrics.core.timeline.availability.AggregationTaskRunner.AGGREGATOR_NAME; +import org.apache.ambari.metrics.core.timeline.discovery.TimelineMetricMetadataManager; +import org.apache.ambari.metrics.core.timeline.query.Condition; + +public class TimelineMetricClusterAggregator extends AbstractTimelineAggregator { + private final TimelineMetricReadHelper readHelper; + private final boolean isClusterPrecisionInputTable; + + public TimelineMetricClusterAggregator(AGGREGATOR_NAME aggregatorName, + TimelineMetricMetadataManager metricMetadataManager, + PhoenixHBaseAccessor hBaseAccessor, + Configuration metricsConf, + String checkpointLocation, + Long sleepIntervalMillis, + Integer checkpointCutOffMultiplier, + String hostAggregatorDisabledParam, + String inputTableName, + String outputTableName, + Long nativeTimeRangeDelay, + MetricCollectorHAController haController) { + super(aggregatorName, hBaseAccessor, metricsConf, checkpointLocation, + sleepIntervalMillis, checkpointCutOffMultiplier, + hostAggregatorDisabledParam, inputTableName, outputTableName, + nativeTimeRangeDelay, haController); + isClusterPrecisionInputTable = inputTableName.equals(METRICS_CLUSTER_AGGREGATE_TABLE_NAME); + readHelper = new TimelineMetricReadHelper(metricMetadataManager, true); + } + + @Override + protected Condition prepareMetricQueryCondition(long startTime, long endTime) { + Condition condition = new DefaultCondition(null, null, null, null, startTime, + endTime, null, null, true); + condition.setNoLimit(); + condition.setFetchSize(resultsetFetchSize); + String sqlStr = String.format(GET_CLUSTER_AGGREGATE_TIME_SQL, tableName); + // HOST_COUNT vs METRIC_COUNT + if (isClusterPrecisionInputTable) { + sqlStr = String.format(GET_CLUSTER_AGGREGATE_SQL, tableName); + } + + condition.setStatement(sqlStr); + condition.addOrderByColumn("UUID"); + condition.addOrderByColumn("SERVER_TIME"); + return condition; + } + + @Override + protected void aggregate(ResultSet rs, long startTime, long endTime) throws IOException, SQLException { + Map<TimelineClusterMetric, MetricHostAggregate> hostAggregateMap = aggregateMetricsFromResultSet(rs, endTime); + + LOG.info("Saving " + hostAggregateMap.size() + " metric aggregates."); + hBaseAccessor.saveClusterAggregateRecordsSecond(hostAggregateMap, outputTableName); + } + + private Map<TimelineClusterMetric, MetricHostAggregate> aggregateMetricsFromResultSet(ResultSet rs, long endTime) + throws IOException, SQLException { + + TimelineClusterMetric existingMetric = null; + MetricHostAggregate hostAggregate = null; + Map<TimelineClusterMetric, MetricHostAggregate> hostAggregateMap = + new HashMap<TimelineClusterMetric, MetricHostAggregate>(); + int perMetricCount = 0; + + while (rs.next()) { + TimelineClusterMetric currentMetric = readHelper.fromResultSet(rs); + + MetricClusterAggregate currentHostAggregate = + isClusterPrecisionInputTable ? + readHelper.getMetricClusterAggregateFromResultSet(rs) : + readHelper.getMetricClusterTimeAggregateFromResultSet(rs); + + if (existingMetric == null) { + // First row + existingMetric = currentMetric; + currentMetric.setTimestamp(endTime); + hostAggregate = new MetricHostAggregate(); + hostAggregateMap.put(currentMetric, hostAggregate); + perMetricCount++; + } + + if (existingMetric.equalsExceptTime(currentMetric)) { + // Recalculate totals with current metric + updateAggregatesFromHost(hostAggregate, currentHostAggregate); + perMetricCount++; + } else { + // Switched over to a new metric - save new metric + + hostAggregate.setSum(hostAggregate.getSum() / (perMetricCount - 1)); + hostAggregate.setNumberOfSamples(Math.round((float)hostAggregate.getNumberOfSamples() / (float)(perMetricCount - 1))); + perMetricCount = 1; + + hostAggregate = new MetricHostAggregate(); + currentMetric.setTimestamp(endTime); + updateAggregatesFromHost(hostAggregate, currentHostAggregate); + hostAggregateMap.put(currentMetric, hostAggregate); + existingMetric = currentMetric; + } + + } + + if (existingMetric != null) { + hostAggregate.setSum(hostAggregate.getSum() / perMetricCount); + hostAggregate.setNumberOfSamples(Math.round((float)hostAggregate.getNumberOfSamples() / (float)perMetricCount)); + } + + return hostAggregateMap; + } + + private void updateAggregatesFromHost(MetricHostAggregate agg, MetricClusterAggregate currentClusterAggregate) { + agg.updateMax(currentClusterAggregate.getMax()); + agg.updateMin(currentClusterAggregate.getMin()); + agg.updateSum(currentClusterAggregate.getSum()); + agg.updateNumberOfSamples(currentClusterAggregate.getNumberOfHosts()); + } +} http://git-wip-us.apache.org/repos/asf/ambari/blob/42112e28/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/aggregators/TimelineMetricClusterAggregatorSecond.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/aggregators/TimelineMetricClusterAggregatorSecond.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/aggregators/TimelineMetricClusterAggregatorSecond.java new file mode 100644 index 0000000..46c82d6 --- /dev/null +++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/aggregators/TimelineMetricClusterAggregatorSecond.java @@ -0,0 +1,263 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ambari.metrics.core.timeline.aggregators; + + +import static org.apache.ambari.metrics.core.timeline.TimelineMetricConfiguration.SERVER_SIDE_TIMESIFT_ADJUSTMENT; +import static org.apache.ambari.metrics.core.timeline.TimelineMetricConfiguration.TIMELINE_METRICS_CLUSTER_AGGREGATOR_INTERPOLATION_ENABLED; +import static org.apache.ambari.metrics.core.timeline.TimelineMetricConfiguration.TIMELINE_METRIC_AGGREGATION_SQL_FILTERS; +import static org.apache.ambari.metrics.core.timeline.aggregators.AggregatorUtils.getTimeSlices; +import static org.apache.ambari.metrics.core.timeline.aggregators.AggregatorUtils.sliceFromTimelineMetric; +import static org.apache.ambari.metrics.core.timeline.query.PhoenixTransactSQL.GET_METRIC_SQL; +import static org.apache.ambari.metrics.core.timeline.query.PhoenixTransactSQL.METRICS_RECORD_TABLE_NAME; + +import java.io.IOException; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.apache.commons.lang.StringUtils; +import org.apache.commons.lang.mutable.MutableInt; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.metrics2.sink.timeline.MetricClusterAggregate; +import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric; +import org.apache.hadoop.metrics2.sink.timeline.TimelineMetricMetadata; +import org.apache.ambari.metrics.core.timeline.PhoenixHBaseAccessor; +import org.apache.ambari.metrics.core.timeline.availability.AggregationTaskRunner.AGGREGATOR_NAME; +import org.apache.ambari.metrics.core.timeline.availability.MetricCollectorHAController; +import org.apache.ambari.metrics.core.timeline.discovery.TimelineMetricMetadataKey; +import org.apache.ambari.metrics.core.timeline.discovery.TimelineMetricMetadataManager; +import org.apache.ambari.metrics.core.timeline.query.Condition; +import org.apache.ambari.metrics.core.timeline.query.DefaultCondition; + +/** + * Aggregates a metric across all hosts in the cluster. Reads metrics from + * the precision table and saves into the aggregate. + */ +public class TimelineMetricClusterAggregatorSecond extends AbstractTimelineAggregator { + public Long timeSliceIntervalMillis; + private TimelineMetricReadHelper timelineMetricReadHelper; + // Aggregator to perform app-level aggregates for host metrics + private final TimelineMetricAppAggregator appAggregator; + // 1 minute client side buffering adjustment + protected final Long serverTimeShiftAdjustment; + protected final boolean interpolationEnabled; + private TimelineMetricMetadataManager metadataManagerInstance; + private String skipAggrPatternStrings; + private final static String liveHostsMetricName = "live_hosts"; + + public TimelineMetricClusterAggregatorSecond(AGGREGATOR_NAME aggregatorName, + TimelineMetricMetadataManager metadataManager, + PhoenixHBaseAccessor hBaseAccessor, + Configuration metricsConf, + String checkpointLocation, + Long sleepIntervalMillis, + Integer checkpointCutOffMultiplier, + String aggregatorDisabledParam, + String tableName, + String outputTableName, + Long nativeTimeRangeDelay, + Long timeSliceInterval, + MetricCollectorHAController haController) { + super(aggregatorName, hBaseAccessor, metricsConf, checkpointLocation, + sleepIntervalMillis, checkpointCutOffMultiplier, aggregatorDisabledParam, + tableName, outputTableName, nativeTimeRangeDelay, haController); + + this.metadataManagerInstance = metadataManager; + appAggregator = new TimelineMetricAppAggregator(metadataManager, metricsConf); + this.timeSliceIntervalMillis = timeSliceInterval; + this.serverTimeShiftAdjustment = Long.parseLong(metricsConf.get(SERVER_SIDE_TIMESIFT_ADJUSTMENT, "90000")); + this.interpolationEnabled = Boolean.parseBoolean(metricsConf.get(TIMELINE_METRICS_CLUSTER_AGGREGATOR_INTERPOLATION_ENABLED, "true")); + this.skipAggrPatternStrings = metricsConf.get(TIMELINE_METRIC_AGGREGATION_SQL_FILTERS); + this.timelineMetricReadHelper = new TimelineMetricReadHelper(metadataManager, true); + } + + @Override + protected void aggregate(ResultSet rs, long startTime, long endTime) throws SQLException, IOException { + // Account for time shift due to client side buffering by shifting the + // timestamps with the difference between server time and series start time + // Also, we do not want to look at the shift time period from the end as well since we can interpolate those points + // that come earlier than the expected, during the next run. + List<Long[]> timeSlices = getTimeSlices(startTime - serverTimeShiftAdjustment, endTime - serverTimeShiftAdjustment, timeSliceIntervalMillis); + // Initialize app aggregates for host metrics + appAggregator.init(); + Map<TimelineClusterMetric, MetricClusterAggregate> aggregateClusterMetrics = + aggregateMetricsFromResultSet(rs, timeSlices); + + LOG.info("Saving " + aggregateClusterMetrics.size() + " metric aggregates."); + hBaseAccessor.saveClusterAggregateRecords(aggregateClusterMetrics); + appAggregator.cleanup(); + } + + @Override + protected Condition prepareMetricQueryCondition(long startTime, long endTime) { + + List<String> metricNames = new ArrayList<>(); + boolean metricNamesNotCondition = false; + + if (!StringUtils.isEmpty(skipAggrPatternStrings)) { + LOG.info("Skipping aggregation for metric patterns : " + skipAggrPatternStrings); + metricNames.addAll(Arrays.asList(skipAggrPatternStrings.split(","))); + metricNamesNotCondition = true; + } + + Condition condition = new DefaultCondition(metricNames, null, null, null, startTime - serverTimeShiftAdjustment, + endTime, null, null, true); + condition.setMetricNamesNotCondition(metricNamesNotCondition); + condition.setNoLimit(); + condition.setFetchSize(resultsetFetchSize); + condition.setStatement(String.format(GET_METRIC_SQL, + METRICS_RECORD_TABLE_NAME)); + // Retaining order of the row-key avoids client side merge sort. + condition.addOrderByColumn("UUID"); + condition.addOrderByColumn("SERVER_TIME"); + return condition; + } + + Map<TimelineClusterMetric, MetricClusterAggregate> aggregateMetricsFromResultSet(ResultSet rs, List<Long[]> timeSlices) + throws SQLException, IOException { + Map<TimelineClusterMetric, MetricClusterAggregate> aggregateClusterMetrics = + new HashMap<TimelineClusterMetric, MetricClusterAggregate>(); + + TimelineMetric metric = null; + Map<String, MutableInt> hostedAppCounter = new HashMap<>(); + if (rs.next()) { + metric = timelineMetricReadHelper.getTimelineMetricFromResultSet(rs); + + // Call slice after all rows for a host are read + while (rs.next()) { + TimelineMetric nextMetric = timelineMetricReadHelper.getTimelineMetricFromResultSet(rs); + // If rows belong to same host combine them before slicing. This + // avoids issues across rows that belong to same hosts but get + // counted as coming from different ones. + if (metric.equalsExceptTime(nextMetric)) { + metric.addMetricValues(nextMetric.getMetricValues()); + } else { + // Process the current metric + int numHosts = processAggregateClusterMetrics(aggregateClusterMetrics, metric, timeSlices); + if (!hostedAppCounter.containsKey(metric.getAppId())) { + hostedAppCounter.put(metric.getAppId(), new MutableInt(numHosts)); + } else { + int currentHostCount = hostedAppCounter.get(metric.getAppId()).intValue(); + if (currentHostCount < numHosts) { + hostedAppCounter.put(metric.getAppId(), new MutableInt(numHosts)); + } + } + metric = nextMetric; + } + } + } + // Process last metric + if (metric != null) { + int numHosts = processAggregateClusterMetrics(aggregateClusterMetrics, metric, timeSlices); + if (!hostedAppCounter.containsKey(metric.getAppId())) { + hostedAppCounter.put(metric.getAppId(), new MutableInt(numHosts)); + } else { + int currentHostCount = hostedAppCounter.get(metric.getAppId()).intValue(); + if (currentHostCount < numHosts) { + hostedAppCounter.put(metric.getAppId(), new MutableInt(numHosts)); + } + } + } + + // Add app level aggregates to save + aggregateClusterMetrics.putAll(appAggregator.getAggregateClusterMetrics()); + + // Add liveHosts per AppId metrics. + long timestamp = timeSlices.get(timeSlices.size() - 1)[1]; + processLiveAppCountMetrics(aggregateClusterMetrics, hostedAppCounter, timestamp); + + return aggregateClusterMetrics; + } + + /** + * Slice metric values into interval specified by : + * timeline.metrics.cluster.aggregator.minute.timeslice.interval + * Normalize value by averaging them within the interval + */ + protected int processAggregateClusterMetrics(Map<TimelineClusterMetric, MetricClusterAggregate> aggregateClusterMetrics, + TimelineMetric metric, List<Long[]> timeSlices) { + // Create time slices + TimelineMetricMetadataKey appKey = new TimelineMetricMetadataKey(metric.getMetricName(), metric.getAppId(), metric.getInstanceId()); + TimelineMetricMetadata metricMetadata = metadataManagerInstance.getMetadataCacheValue(appKey); + + if (metricMetadata != null && !metricMetadata.isSupportsAggregates()) { + LOG.debug("Skipping cluster aggregation for " + metric.getMetricName()); + return 0; + } + + Map<TimelineClusterMetric, Double> clusterMetrics = sliceFromTimelineMetric(metric, timeSlices, interpolationEnabled); + + return aggregateClusterMetricsFromSlices(clusterMetrics, aggregateClusterMetrics, metric.getHostName()); + } + + protected int aggregateClusterMetricsFromSlices(Map<TimelineClusterMetric, Double> clusterMetrics, + Map<TimelineClusterMetric, MetricClusterAggregate> aggregateClusterMetrics, + String hostname) { + + int numHosts = 0; + if (clusterMetrics != null && !clusterMetrics.isEmpty()) { + for (Map.Entry<TimelineClusterMetric, Double> clusterMetricEntry : clusterMetrics.entrySet()) { + + TimelineClusterMetric clusterMetric = clusterMetricEntry.getKey(); + Double avgValue = clusterMetricEntry.getValue(); + + MetricClusterAggregate aggregate = aggregateClusterMetrics.get(clusterMetric); + + if (aggregate == null) { + aggregate = new MetricClusterAggregate(avgValue, 1, null, avgValue, avgValue); + aggregateClusterMetrics.put(clusterMetric, aggregate); + } else { + aggregate.updateSum(avgValue); + aggregate.updateNumberOfHosts(1); + aggregate.updateMax(avgValue); + aggregate.updateMin(avgValue); + } + + numHosts = aggregate.getNumberOfHosts(); + // Update app level aggregates + appAggregator.processTimelineClusterMetric(clusterMetric, hostname, avgValue); + } + } + return numHosts; + } + + /* Add cluster metric for number of hosts that are hosting an appId */ + protected void processLiveAppCountMetrics(Map<TimelineClusterMetric, MetricClusterAggregate> aggregateClusterMetrics, + Map<String, MutableInt> appHostsCount, long timestamp) { + + for (Map.Entry<String, MutableInt> appHostsEntry : appHostsCount.entrySet()) { + TimelineClusterMetric timelineClusterMetric = new TimelineClusterMetric( + liveHostsMetricName, appHostsEntry.getKey(), null, timestamp); + + Integer numOfHosts = appHostsEntry.getValue().intValue(); + + MetricClusterAggregate metricClusterAggregate = new MetricClusterAggregate( + (double) numOfHosts, 1, null, (double) numOfHosts, (double) numOfHosts); + + metadataManagerInstance.getUuid(timelineClusterMetric); + + aggregateClusterMetrics.put(timelineClusterMetric, metricClusterAggregate); + } + + } +}
