http://git-wip-us.apache.org/repos/asf/ambari/blob/42112e28/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricClusterAggregator.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricClusterAggregator.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricClusterAggregator.java deleted file mode 100644 index 5c370f4..0000000 --- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricClusterAggregator.java +++ /dev/null @@ -1,145 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators; - -import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.GET_CLUSTER_AGGREGATE_SQL; -import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.GET_CLUSTER_AGGREGATE_TIME_SQL; -import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.METRICS_CLUSTER_AGGREGATE_TABLE_NAME; - -import java.io.IOException; -import java.sql.ResultSet; -import java.sql.SQLException; -import java.util.HashMap; -import java.util.Map; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.metrics2.sink.timeline.MetricClusterAggregate; -import org.apache.hadoop.metrics2.sink.timeline.MetricHostAggregate; -import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixHBaseAccessor; -import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.AggregationTaskRunner.AGGREGATOR_NAME; -import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.MetricCollectorHAController; -import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.discovery.TimelineMetricMetadataManager; -import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.Condition; -import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.DefaultCondition; - -public class TimelineMetricClusterAggregator extends AbstractTimelineAggregator { - private final TimelineMetricReadHelper readHelper; - private final boolean isClusterPrecisionInputTable; - - public TimelineMetricClusterAggregator(AGGREGATOR_NAME aggregatorName, - TimelineMetricMetadataManager metricMetadataManager, - PhoenixHBaseAccessor hBaseAccessor, - Configuration metricsConf, - String checkpointLocation, - Long sleepIntervalMillis, - Integer checkpointCutOffMultiplier, - String hostAggregatorDisabledParam, - String inputTableName, - String outputTableName, - Long nativeTimeRangeDelay, - MetricCollectorHAController haController) { - super(aggregatorName, hBaseAccessor, metricsConf, checkpointLocation, - sleepIntervalMillis, checkpointCutOffMultiplier, - hostAggregatorDisabledParam, inputTableName, outputTableName, - nativeTimeRangeDelay, haController); - isClusterPrecisionInputTable = inputTableName.equals(METRICS_CLUSTER_AGGREGATE_TABLE_NAME); - readHelper = new TimelineMetricReadHelper(metricMetadataManager, true); - } - - @Override - protected Condition prepareMetricQueryCondition(long startTime, long endTime) { - Condition condition = new DefaultCondition(null, null, null, null, startTime, - endTime, null, null, true); - condition.setNoLimit(); - condition.setFetchSize(resultsetFetchSize); - String sqlStr = String.format(GET_CLUSTER_AGGREGATE_TIME_SQL, tableName); - // HOST_COUNT vs METRIC_COUNT - if (isClusterPrecisionInputTable) { - sqlStr = String.format(GET_CLUSTER_AGGREGATE_SQL, tableName); - } - - condition.setStatement(sqlStr); - condition.addOrderByColumn("UUID"); - condition.addOrderByColumn("SERVER_TIME"); - return condition; - } - - @Override - protected void aggregate(ResultSet rs, long startTime, long endTime) throws IOException, SQLException { - Map<TimelineClusterMetric, MetricHostAggregate> hostAggregateMap = aggregateMetricsFromResultSet(rs, endTime); - - LOG.info("Saving " + hostAggregateMap.size() + " metric aggregates."); - hBaseAccessor.saveClusterAggregateRecordsSecond(hostAggregateMap, outputTableName); - } - - private Map<TimelineClusterMetric, MetricHostAggregate> aggregateMetricsFromResultSet(ResultSet rs, long endTime) - throws IOException, SQLException { - - TimelineClusterMetric existingMetric = null; - MetricHostAggregate hostAggregate = null; - Map<TimelineClusterMetric, MetricHostAggregate> hostAggregateMap = - new HashMap<TimelineClusterMetric, MetricHostAggregate>(); - int perMetricCount = 0; - - while (rs.next()) { - TimelineClusterMetric currentMetric = readHelper.fromResultSet(rs); - - MetricClusterAggregate currentHostAggregate = - isClusterPrecisionInputTable ? - readHelper.getMetricClusterAggregateFromResultSet(rs) : - readHelper.getMetricClusterTimeAggregateFromResultSet(rs); - - if (existingMetric == null) { - // First row - existingMetric = currentMetric; - currentMetric.setTimestamp(endTime); - hostAggregate = new MetricHostAggregate(); - hostAggregateMap.put(currentMetric, hostAggregate); - perMetricCount++; - } - - if (existingMetric.equalsExceptTime(currentMetric)) { - // Recalculate totals with current metric - updateAggregatesFromHost(hostAggregate, currentHostAggregate); - perMetricCount++; - } else { - // Switched over to a new metric - save new metric - - hostAggregate.setSum(hostAggregate.getSum() / perMetricCount); - hostAggregate.setNumberOfSamples(Math.round((float)hostAggregate.getNumberOfSamples() / (float)perMetricCount)); - perMetricCount = 1; - - hostAggregate = new MetricHostAggregate(); - currentMetric.setTimestamp(endTime); - updateAggregatesFromHost(hostAggregate, currentHostAggregate); - hostAggregateMap.put(currentMetric, hostAggregate); - existingMetric = currentMetric; - } - - } - - return hostAggregateMap; - } - - private void updateAggregatesFromHost(MetricHostAggregate agg, MetricClusterAggregate currentClusterAggregate) { - agg.updateMax(currentClusterAggregate.getMax()); - agg.updateMin(currentClusterAggregate.getMin()); - agg.updateSum(currentClusterAggregate.getSum()); - agg.updateNumberOfSamples(currentClusterAggregate.getNumberOfHosts()); - } -}
http://git-wip-us.apache.org/repos/asf/ambari/blob/42112e28/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricClusterAggregatorSecond.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricClusterAggregatorSecond.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricClusterAggregatorSecond.java deleted file mode 100644 index bbe8f7b..0000000 --- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricClusterAggregatorSecond.java +++ /dev/null @@ -1,263 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators; - - -import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.SERVER_SIDE_TIMESIFT_ADJUSTMENT; -import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.TIMELINE_METRICS_CLUSTER_AGGREGATOR_INTERPOLATION_ENABLED; -import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.TIMELINE_METRIC_AGGREGATION_SQL_FILTERS; -import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.AggregatorUtils.getTimeSlices; -import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.AggregatorUtils.sliceFromTimelineMetric; -import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.GET_METRIC_SQL; -import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.METRICS_RECORD_TABLE_NAME; - -import java.io.IOException; -import java.sql.ResultSet; -import java.sql.SQLException; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.HashMap; -import java.util.List; -import java.util.Map; - -import org.apache.commons.lang.StringUtils; -import org.apache.commons.lang.mutable.MutableInt; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.metrics2.sink.timeline.MetricClusterAggregate; -import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric; -import org.apache.hadoop.metrics2.sink.timeline.TimelineMetricMetadata; -import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixHBaseAccessor; -import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.AggregationTaskRunner.AGGREGATOR_NAME; -import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.MetricCollectorHAController; -import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.discovery.TimelineMetricMetadataKey; -import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.discovery.TimelineMetricMetadataManager; -import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.Condition; -import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.DefaultCondition; - -/** - * Aggregates a metric across all hosts in the cluster. Reads metrics from - * the precision table and saves into the aggregate. - */ -public class TimelineMetricClusterAggregatorSecond extends AbstractTimelineAggregator { - public Long timeSliceIntervalMillis; - private TimelineMetricReadHelper timelineMetricReadHelper; - // Aggregator to perform app-level aggregates for host metrics - private final TimelineMetricAppAggregator appAggregator; - // 1 minute client side buffering adjustment - protected final Long serverTimeShiftAdjustment; - protected final boolean interpolationEnabled; - private TimelineMetricMetadataManager metadataManagerInstance; - private String skipAggrPatternStrings; - private final static String liveHostsMetricName = "live_hosts"; - - public TimelineMetricClusterAggregatorSecond(AGGREGATOR_NAME aggregatorName, - TimelineMetricMetadataManager metadataManager, - PhoenixHBaseAccessor hBaseAccessor, - Configuration metricsConf, - String checkpointLocation, - Long sleepIntervalMillis, - Integer checkpointCutOffMultiplier, - String aggregatorDisabledParam, - String tableName, - String outputTableName, - Long nativeTimeRangeDelay, - Long timeSliceInterval, - MetricCollectorHAController haController) { - super(aggregatorName, hBaseAccessor, metricsConf, checkpointLocation, - sleepIntervalMillis, checkpointCutOffMultiplier, aggregatorDisabledParam, - tableName, outputTableName, nativeTimeRangeDelay, haController); - - this.metadataManagerInstance = metadataManager; - appAggregator = new TimelineMetricAppAggregator(metadataManager, metricsConf); - this.timeSliceIntervalMillis = timeSliceInterval; - this.serverTimeShiftAdjustment = Long.parseLong(metricsConf.get(SERVER_SIDE_TIMESIFT_ADJUSTMENT, "90000")); - this.interpolationEnabled = Boolean.parseBoolean(metricsConf.get(TIMELINE_METRICS_CLUSTER_AGGREGATOR_INTERPOLATION_ENABLED, "true")); - this.skipAggrPatternStrings = metricsConf.get(TIMELINE_METRIC_AGGREGATION_SQL_FILTERS); - this.timelineMetricReadHelper = new TimelineMetricReadHelper(metadataManager, true); - } - - @Override - protected void aggregate(ResultSet rs, long startTime, long endTime) throws SQLException, IOException { - // Account for time shift due to client side buffering by shifting the - // timestamps with the difference between server time and series start time - // Also, we do not want to look at the shift time period from the end as well since we can interpolate those points - // that come earlier than the expected, during the next run. - List<Long[]> timeSlices = getTimeSlices(startTime - serverTimeShiftAdjustment, endTime - serverTimeShiftAdjustment, timeSliceIntervalMillis); - // Initialize app aggregates for host metrics - appAggregator.init(); - Map<TimelineClusterMetric, MetricClusterAggregate> aggregateClusterMetrics = - aggregateMetricsFromResultSet(rs, timeSlices); - - LOG.info("Saving " + aggregateClusterMetrics.size() + " metric aggregates."); - hBaseAccessor.saveClusterAggregateRecords(aggregateClusterMetrics); - appAggregator.cleanup(); - } - - @Override - protected Condition prepareMetricQueryCondition(long startTime, long endTime) { - - List<String> metricNames = new ArrayList<>(); - boolean metricNamesNotCondition = false; - - if (!StringUtils.isEmpty(skipAggrPatternStrings)) { - LOG.info("Skipping aggregation for metric patterns : " + skipAggrPatternStrings); - metricNames.addAll(Arrays.asList(skipAggrPatternStrings.split(","))); - metricNamesNotCondition = true; - } - - Condition condition = new DefaultCondition(metricNames, null, null, null, startTime - serverTimeShiftAdjustment, - endTime, null, null, true); - condition.setMetricNamesNotCondition(metricNamesNotCondition); - condition.setNoLimit(); - condition.setFetchSize(resultsetFetchSize); - condition.setStatement(String.format(GET_METRIC_SQL, - METRICS_RECORD_TABLE_NAME)); - // Retaining order of the row-key avoids client side merge sort. - condition.addOrderByColumn("UUID"); - condition.addOrderByColumn("SERVER_TIME"); - return condition; - } - - Map<TimelineClusterMetric, MetricClusterAggregate> aggregateMetricsFromResultSet(ResultSet rs, List<Long[]> timeSlices) - throws SQLException, IOException { - Map<TimelineClusterMetric, MetricClusterAggregate> aggregateClusterMetrics = - new HashMap<TimelineClusterMetric, MetricClusterAggregate>(); - - TimelineMetric metric = null; - Map<String, MutableInt> hostedAppCounter = new HashMap<>(); - if (rs.next()) { - metric = timelineMetricReadHelper.getTimelineMetricFromResultSet(rs); - - // Call slice after all rows for a host are read - while (rs.next()) { - TimelineMetric nextMetric = timelineMetricReadHelper.getTimelineMetricFromResultSet(rs); - // If rows belong to same host combine them before slicing. This - // avoids issues across rows that belong to same hosts but get - // counted as coming from different ones. - if (metric.equalsExceptTime(nextMetric)) { - metric.addMetricValues(nextMetric.getMetricValues()); - } else { - // Process the current metric - int numHosts = processAggregateClusterMetrics(aggregateClusterMetrics, metric, timeSlices); - if (!hostedAppCounter.containsKey(metric.getAppId())) { - hostedAppCounter.put(metric.getAppId(), new MutableInt(numHosts)); - } else { - int currentHostCount = hostedAppCounter.get(metric.getAppId()).intValue(); - if (currentHostCount < numHosts) { - hostedAppCounter.put(metric.getAppId(), new MutableInt(numHosts)); - } - } - metric = nextMetric; - } - } - } - // Process last metric - if (metric != null) { - int numHosts = processAggregateClusterMetrics(aggregateClusterMetrics, metric, timeSlices); - if (!hostedAppCounter.containsKey(metric.getAppId())) { - hostedAppCounter.put(metric.getAppId(), new MutableInt(numHosts)); - } else { - int currentHostCount = hostedAppCounter.get(metric.getAppId()).intValue(); - if (currentHostCount < numHosts) { - hostedAppCounter.put(metric.getAppId(), new MutableInt(numHosts)); - } - } - } - - // Add app level aggregates to save - aggregateClusterMetrics.putAll(appAggregator.getAggregateClusterMetrics()); - - // Add liveHosts per AppId metrics. - long timestamp = timeSlices.get(timeSlices.size() - 1)[1]; - processLiveAppCountMetrics(aggregateClusterMetrics, hostedAppCounter, timestamp); - - return aggregateClusterMetrics; - } - - /** - * Slice metric values into interval specified by : - * timeline.metrics.cluster.aggregator.minute.timeslice.interval - * Normalize value by averaging them within the interval - */ - protected int processAggregateClusterMetrics(Map<TimelineClusterMetric, MetricClusterAggregate> aggregateClusterMetrics, - TimelineMetric metric, List<Long[]> timeSlices) { - // Create time slices - TimelineMetricMetadataKey appKey = new TimelineMetricMetadataKey(metric.getMetricName(), metric.getAppId(), metric.getInstanceId()); - TimelineMetricMetadata metricMetadata = metadataManagerInstance.getMetadataCacheValue(appKey); - - if (metricMetadata != null && !metricMetadata.isSupportsAggregates()) { - LOG.debug("Skipping cluster aggregation for " + metric.getMetricName()); - return 0; - } - - Map<TimelineClusterMetric, Double> clusterMetrics = sliceFromTimelineMetric(metric, timeSlices, interpolationEnabled); - - return aggregateClusterMetricsFromSlices(clusterMetrics, aggregateClusterMetrics, metric.getHostName()); - } - - protected int aggregateClusterMetricsFromSlices(Map<TimelineClusterMetric, Double> clusterMetrics, - Map<TimelineClusterMetric, MetricClusterAggregate> aggregateClusterMetrics, - String hostname) { - - int numHosts = 0; - if (clusterMetrics != null && !clusterMetrics.isEmpty()) { - for (Map.Entry<TimelineClusterMetric, Double> clusterMetricEntry : clusterMetrics.entrySet()) { - - TimelineClusterMetric clusterMetric = clusterMetricEntry.getKey(); - Double avgValue = clusterMetricEntry.getValue(); - - MetricClusterAggregate aggregate = aggregateClusterMetrics.get(clusterMetric); - - if (aggregate == null) { - aggregate = new MetricClusterAggregate(avgValue, 1, null, avgValue, avgValue); - aggregateClusterMetrics.put(clusterMetric, aggregate); - } else { - aggregate.updateSum(avgValue); - aggregate.updateNumberOfHosts(1); - aggregate.updateMax(avgValue); - aggregate.updateMin(avgValue); - } - - numHosts = aggregate.getNumberOfHosts(); - // Update app level aggregates - appAggregator.processTimelineClusterMetric(clusterMetric, hostname, avgValue); - } - } - return numHosts; - } - - /* Add cluster metric for number of hosts that are hosting an appId */ - protected void processLiveAppCountMetrics(Map<TimelineClusterMetric, MetricClusterAggregate> aggregateClusterMetrics, - Map<String, MutableInt> appHostsCount, long timestamp) { - - for (Map.Entry<String, MutableInt> appHostsEntry : appHostsCount.entrySet()) { - TimelineClusterMetric timelineClusterMetric = new TimelineClusterMetric( - liveHostsMetricName, appHostsEntry.getKey(), null, timestamp); - - Integer numOfHosts = appHostsEntry.getValue().intValue(); - - MetricClusterAggregate metricClusterAggregate = new MetricClusterAggregate( - (double) numOfHosts, 1, null, (double) numOfHosts, (double) numOfHosts); - - metadataManagerInstance.getUuid(timelineClusterMetric); - - aggregateClusterMetrics.put(timelineClusterMetric, metricClusterAggregate); - } - - } -} http://git-wip-us.apache.org/repos/asf/ambari/blob/42112e28/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricClusterAggregatorSecondWithCacheSource.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricClusterAggregatorSecondWithCacheSource.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricClusterAggregatorSecondWithCacheSource.java deleted file mode 100644 index dc31086..0000000 --- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricClusterAggregatorSecondWithCacheSource.java +++ /dev/null @@ -1,104 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators; - -import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.AggregatorUtils.getTimeSlices; - -import java.util.Date; -import java.util.HashMap; -import java.util.List; -import java.util.Map; - -import org.apache.commons.lang.mutable.MutableInt; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.metrics2.sink.timeline.MetricClusterAggregate; -import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixHBaseAccessor; -import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricDistributedCache; -import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.AggregationTaskRunner; -import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.MetricCollectorHAController; -import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.discovery.TimelineMetricMetadataManager; - -public class TimelineMetricClusterAggregatorSecondWithCacheSource extends TimelineMetricClusterAggregatorSecond { - private TimelineMetricDistributedCache distributedCache; - public TimelineMetricClusterAggregatorSecondWithCacheSource(AggregationTaskRunner.AGGREGATOR_NAME metricAggregateSecond, TimelineMetricMetadataManager metricMetadataManager, PhoenixHBaseAccessor hBaseAccessor, Configuration metricsConf, String checkpointLocation, long sleepIntervalMillis, int checkpointCutOffMultiplier, String aggregatorDisabledParam, String inputTableName, String outputTableName, - Long nativeTimeRangeDelay, - Long timeSliceInterval, - MetricCollectorHAController haController, TimelineMetricDistributedCache distributedCache) { - super(metricAggregateSecond, metricMetadataManager, hBaseAccessor, metricsConf, checkpointLocation, sleepIntervalMillis, checkpointCutOffMultiplier, aggregatorDisabledParam, inputTableName, outputTableName, nativeTimeRangeDelay, timeSliceInterval, haController); - this.distributedCache = distributedCache; - } - - @Override - public boolean doWork(long startTime, long endTime) { - LOG.info("Start aggregation cycle @ " + new Date() + ", " + - "startTime = " + new Date(startTime) + ", endTime = " + new Date(endTime)); - try { - Map<String, Double> caheMetrics; - if (LOG.isDebugEnabled()) { - caheMetrics = distributedCache.getPointInTimeCacheMetrics(); - LOG.debug("Ignite metrics before eviction : " + caheMetrics); - } - - LOG.info("Trying to evict elements from cache"); - Map<TimelineClusterMetric, MetricClusterAggregate> metricsFromCache = distributedCache.evictMetricAggregates(startTime - serverTimeShiftAdjustment, endTime - serverTimeShiftAdjustment); - LOG.info(String.format("Evicted %s elements from cache.", metricsFromCache.size())); - - if (LOG.isDebugEnabled()) { - caheMetrics = distributedCache.getPointInTimeCacheMetrics(); - LOG.debug("Ignite metrics after eviction : " + caheMetrics); - } - - List<Long[]> timeSlices = getTimeSlices(startTime - serverTimeShiftAdjustment, endTime - serverTimeShiftAdjustment, timeSliceIntervalMillis); - Map<TimelineClusterMetric, MetricClusterAggregate> result = aggregateMetricsFromMetricClusterAggregates(metricsFromCache, timeSlices); - - LOG.info("Saving " + result.size() + " metric aggregates."); - hBaseAccessor.saveClusterAggregateRecords(result); - LOG.info("End aggregation cycle @ " + new Date()); - return true; - } catch (Exception e) { - LOG.error("Exception during aggregation. ", e); - return false; - } - } - - //Slices in cache could be different from aggregate slices, so need to recalculate. Counts hosted apps - Map<TimelineClusterMetric, MetricClusterAggregate> aggregateMetricsFromMetricClusterAggregates(Map<TimelineClusterMetric, MetricClusterAggregate> metricsFromCache, List<Long[]> timeSlices) { - //TODO add basic interpolation - //TODO investigate if needed, maybe add config to disable/enable - //count hosted apps - Map<String, MutableInt> hostedAppCounter = new HashMap<>(); - for (Map.Entry<TimelineClusterMetric, MetricClusterAggregate> clusterMetricAggregateEntry : metricsFromCache.entrySet()) { - int numHosts = clusterMetricAggregateEntry.getValue().getNumberOfHosts(); - String appId = clusterMetricAggregateEntry.getKey().getAppId(); - if (!hostedAppCounter.containsKey(appId)) { - hostedAppCounter.put(appId, new MutableInt(numHosts)); - } else { - int currentHostCount = hostedAppCounter.get(appId).intValue(); - if (currentHostCount < numHosts) { - hostedAppCounter.put(appId, new MutableInt(numHosts)); - } - } - } - - // Add liveHosts per AppId metrics. - processLiveAppCountMetrics(metricsFromCache, hostedAppCounter, timeSlices.get(timeSlices.size() - 1)[1]); - - return metricsFromCache; - } - -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ambari/blob/42112e28/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricFilteringHostAggregator.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricFilteringHostAggregator.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricFilteringHostAggregator.java deleted file mode 100644 index a75d2c4..0000000 --- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricFilteringHostAggregator.java +++ /dev/null @@ -1,94 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators; - -import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.GET_METRIC_AGGREGATE_ONLY_SQL; - -import java.util.ArrayList; -import java.util.List; -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixHBaseAccessor; -import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.AggregationTaskRunner; -import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.MetricCollectorHAController; -import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.discovery.TimelineMetricMetadataManager; -import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.Condition; -import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.DefaultCondition; - -public class TimelineMetricFilteringHostAggregator extends TimelineMetricHostAggregator { - private static final Log LOG = LogFactory.getLog(TimelineMetricFilteringHostAggregator.class); - private TimelineMetricMetadataManager metricMetadataManager; - private ConcurrentHashMap<String, Long> postedAggregatedMap; - - public TimelineMetricFilteringHostAggregator(AggregationTaskRunner.AGGREGATOR_NAME aggregatorName, - TimelineMetricMetadataManager metricMetadataManager, - PhoenixHBaseAccessor hBaseAccessor, - Configuration metricsConf, - String checkpointLocation, - Long sleepIntervalMillis, - Integer checkpointCutOffMultiplier, - String hostAggregatorDisabledParam, - String tableName, - String outputTableName, - Long nativeTimeRangeDelay, - MetricCollectorHAController haController, - ConcurrentHashMap<String, Long> postedAggregatedMap) { - super(aggregatorName, metricMetadataManager, - hBaseAccessor, metricsConf, - checkpointLocation, - sleepIntervalMillis, - checkpointCutOffMultiplier, - hostAggregatorDisabledParam, - tableName, - outputTableName, - nativeTimeRangeDelay, - haController); - this.metricMetadataManager = metricMetadataManager; - this.postedAggregatedMap = postedAggregatedMap; - } - - @Override - protected Condition prepareMetricQueryCondition(long startTime, long endTime) { - List<String> aggregatedHostnames = new ArrayList<>(); - for (Map.Entry<String, Long> entry : postedAggregatedMap.entrySet()) { - if (entry.getValue() > startTime && entry.getValue() <= endTime) { - aggregatedHostnames.add(entry.getKey()); - } - } - List<String> notAggregatedHostnames = metricMetadataManager.getNotLikeHostnames(aggregatedHostnames); - if (LOG.isDebugEnabled()) { - LOG.debug("Already aggregated hostnames based on postedAggregatedMap : " + aggregatedHostnames); - LOG.debug("Hostnames that will be aggregated : " + notAggregatedHostnames); - } - List<byte[]> uuids = metricMetadataManager.getUuids(new ArrayList<String>(), notAggregatedHostnames, "", ""); - - Condition condition = new DefaultCondition(uuids, null, null, null, null, startTime, - endTime, null, null, true); - condition.setNoLimit(); - condition.setFetchSize(resultsetFetchSize); - condition.setStatement(String.format(GET_METRIC_AGGREGATE_ONLY_SQL, tableName)); - // Retaining order of the row-key avoids client side merge sort. - condition.addOrderByColumn("UUID"); - condition.addOrderByColumn("SERVER_TIME"); - return condition; - } -} http://git-wip-us.apache.org/repos/asf/ambari/blob/42112e28/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricHostAggregator.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricHostAggregator.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricHostAggregator.java deleted file mode 100644 index 6a11599..0000000 --- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricHostAggregator.java +++ /dev/null @@ -1,121 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators; - -import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.GET_METRIC_AGGREGATE_ONLY_SQL; - -import java.io.IOException; -import java.sql.ResultSet; -import java.sql.SQLException; -import java.util.HashMap; -import java.util.Map; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.metrics2.sink.timeline.MetricHostAggregate; -import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric; -import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixHBaseAccessor; -import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.AggregationTaskRunner.AGGREGATOR_NAME; -import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.MetricCollectorHAController; -import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.discovery.TimelineMetricMetadataManager; -import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.Condition; -import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.DefaultCondition; - -public class TimelineMetricHostAggregator extends AbstractTimelineAggregator { - private static final Log LOG = LogFactory.getLog(TimelineMetricHostAggregator.class); - TimelineMetricReadHelper readHelper; - - public TimelineMetricHostAggregator(AGGREGATOR_NAME aggregatorName, - TimelineMetricMetadataManager metricMetadataManager, - PhoenixHBaseAccessor hBaseAccessor, - Configuration metricsConf, - String checkpointLocation, - Long sleepIntervalMillis, - Integer checkpointCutOffMultiplier, - String hostAggregatorDisabledParam, - String tableName, - String outputTableName, - Long nativeTimeRangeDelay, - MetricCollectorHAController haController) { - super(aggregatorName, hBaseAccessor, metricsConf, checkpointLocation, - sleepIntervalMillis, checkpointCutOffMultiplier, hostAggregatorDisabledParam, - tableName, outputTableName, nativeTimeRangeDelay, haController); - readHelper = new TimelineMetricReadHelper(metricMetadataManager, false); - } - - @Override - protected void aggregate(ResultSet rs, long startTime, long endTime) throws IOException, SQLException { - - Map<TimelineMetric, MetricHostAggregate> hostAggregateMap = aggregateMetricsFromResultSet(rs, endTime); - - LOG.info("Saving " + hostAggregateMap.size() + " metric aggregates."); - hBaseAccessor.saveHostAggregateRecords(hostAggregateMap, outputTableName); - } - - @Override - protected Condition prepareMetricQueryCondition(long startTime, long endTime) { - Condition condition = new DefaultCondition(null, null, null, null, startTime, - endTime, null, null, true); - condition.setNoLimit(); - condition.setFetchSize(resultsetFetchSize); - condition.setStatement(String.format(GET_METRIC_AGGREGATE_ONLY_SQL, tableName)); - // Retaining order of the row-key avoids client side merge sort. - condition.addOrderByColumn("UUID"); - condition.addOrderByColumn("SERVER_TIME"); - return condition; - } - - private Map<TimelineMetric, MetricHostAggregate> aggregateMetricsFromResultSet(ResultSet rs, long endTime) - throws IOException, SQLException { - TimelineMetric existingMetric = null; - MetricHostAggregate hostAggregate = null; - Map<TimelineMetric, MetricHostAggregate> hostAggregateMap = new HashMap<TimelineMetric, MetricHostAggregate>(); - - - while (rs.next()) { - TimelineMetric currentMetric = - readHelper.getTimelineMetricKeyFromResultSet(rs); - MetricHostAggregate currentHostAggregate = - readHelper.getMetricHostAggregateFromResultSet(rs); - - if (existingMetric == null) { - // First row - existingMetric = currentMetric; - currentMetric.setStartTime(endTime); - hostAggregate = new MetricHostAggregate(); - hostAggregateMap.put(currentMetric, hostAggregate); - } - - if (existingMetric.equalsExceptTime(currentMetric)) { - // Recalculate totals with current metric - hostAggregate.updateAggregates(currentHostAggregate); - } else { - // Switched over to a new metric - save existing - create new aggregate - currentMetric.setStartTime(endTime); - hostAggregate = new MetricHostAggregate(); - hostAggregate.updateAggregates(currentHostAggregate); - hostAggregateMap.put(currentMetric, hostAggregate); - existingMetric = currentMetric; - } - } - return hostAggregateMap; - } - - -} http://git-wip-us.apache.org/repos/asf/ambari/blob/42112e28/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricReadHelper.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricReadHelper.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricReadHelper.java deleted file mode 100644 index 5d31b51..0000000 --- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricReadHelper.java +++ /dev/null @@ -1,170 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators; - - -import java.io.IOException; -import java.sql.ResultSet; -import java.sql.SQLException; -import java.util.TreeMap; - -import org.apache.hadoop.metrics2.sink.timeline.MetricClusterAggregate; -import org.apache.hadoop.metrics2.sink.timeline.MetricHostAggregate; -import org.apache.hadoop.metrics2.sink.timeline.SingleValuedTimelineMetric; -import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric; -import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixHBaseAccessor; -import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.discovery.TimelineMetricMetadataManager; - -public class TimelineMetricReadHelper { - - private boolean ignoreInstance = false; - private TimelineMetricMetadataManager metadataManagerInstance = null; - - public TimelineMetricReadHelper() {} - - public TimelineMetricReadHelper(boolean ignoreInstance) { - this.ignoreInstance = ignoreInstance; - } - - public TimelineMetricReadHelper(TimelineMetricMetadataManager timelineMetricMetadataManager) { - this.metadataManagerInstance = timelineMetricMetadataManager; - } - - public TimelineMetricReadHelper(TimelineMetricMetadataManager timelineMetricMetadataManager, boolean ignoreInstance) { - this.metadataManagerInstance = timelineMetricMetadataManager; - this.ignoreInstance = ignoreInstance; - } - - public TimelineMetric getTimelineMetricFromResultSet(ResultSet rs) - throws SQLException, IOException { - TimelineMetric metric = getTimelineMetricCommonsFromResultSet(rs); - TreeMap<Long, Double> sortedByTimeMetrics = PhoenixHBaseAccessor.readMetricFromJSON(rs.getString("METRICS")); - metric.setMetricValues(sortedByTimeMetrics); - return metric; - } - - public SingleValuedTimelineMetric getAggregatedTimelineMetricFromResultSet(ResultSet rs, - Function f) throws SQLException, IOException { - - byte[] uuid = rs.getBytes("UUID"); - TimelineMetric timelineMetric = metadataManagerInstance.getMetricFromUuid(uuid); - Function function = (f != null) ? f : Function.DEFAULT_VALUE_FUNCTION; - SingleValuedTimelineMetric metric = new SingleValuedTimelineMetric( - timelineMetric.getMetricName() + function.getSuffix(), - timelineMetric.getAppId(), - timelineMetric.getInstanceId(), - timelineMetric.getHostName(), - rs.getLong("SERVER_TIME") - ); - - double value; - switch(function.getReadFunction()){ - case AVG: - value = rs.getDouble("METRIC_SUM") / rs.getInt("METRIC_COUNT"); - break; - case MIN: - value = rs.getDouble("METRIC_MIN"); - break; - case MAX: - value = rs.getDouble("METRIC_MAX"); - break; - case SUM: - value = rs.getDouble("METRIC_SUM") / rs.getInt("METRIC_COUNT"); - break; - default: - value = rs.getDouble("METRIC_SUM") / rs.getInt("METRIC_COUNT"); - break; - } - - metric.setSingleTimeseriesValue(rs.getLong("SERVER_TIME"), value); - - return metric; - } - - /** - * Returns common part of timeline metrics record without the values. - */ - public TimelineMetric getTimelineMetricCommonsFromResultSet(ResultSet rs) - throws SQLException { - - byte[] uuid = rs.getBytes("UUID"); - TimelineMetric metric = metadataManagerInstance.getMetricFromUuid(uuid); - if (ignoreInstance) { - metric.setInstanceId(null); - } - metric.setStartTime(rs.getLong("SERVER_TIME")); - return metric; - } - - public MetricClusterAggregate getMetricClusterAggregateFromResultSet(ResultSet rs) - throws SQLException { - MetricClusterAggregate agg = new MetricClusterAggregate(); - agg.setSum(rs.getDouble("METRIC_SUM")); - agg.setMax(rs.getDouble("METRIC_MAX")); - agg.setMin(rs.getDouble("METRIC_MIN")); - agg.setNumberOfHosts(rs.getInt("HOSTS_COUNT")); - - agg.setDeviation(0.0); - - return agg; - } - - public MetricClusterAggregate getMetricClusterTimeAggregateFromResultSet(ResultSet rs) - throws SQLException { - MetricClusterAggregate agg = new MetricClusterAggregate(); - agg.setSum(rs.getDouble("METRIC_SUM")); - agg.setMax(rs.getDouble("METRIC_MAX")); - agg.setMin(rs.getDouble("METRIC_MIN")); - agg.setNumberOfHosts(rs.getInt("METRIC_COUNT")); - - agg.setDeviation(0.0); - - return agg; - } - - public TimelineClusterMetric fromResultSet(ResultSet rs) throws SQLException { - - byte[] uuid = rs.getBytes("UUID"); - TimelineMetric timelineMetric = metadataManagerInstance.getMetricFromUuid(uuid); - - return new TimelineClusterMetric( - timelineMetric.getMetricName(), - timelineMetric.getAppId(), - ignoreInstance ? null : timelineMetric.getInstanceId(), - rs.getLong("SERVER_TIME")); - } - - public MetricHostAggregate getMetricHostAggregateFromResultSet(ResultSet rs) - throws SQLException { - MetricHostAggregate metricHostAggregate = new MetricHostAggregate(); - metricHostAggregate.setSum(rs.getDouble("METRIC_SUM")); - metricHostAggregate.setMax(rs.getDouble("METRIC_MAX")); - metricHostAggregate.setMin(rs.getDouble("METRIC_MIN")); - metricHostAggregate.setNumberOfSamples(rs.getLong("METRIC_COUNT")); - - metricHostAggregate.setDeviation(0.0); - return metricHostAggregate; - } - - public TimelineMetric getTimelineMetricKeyFromResultSet(ResultSet rs) - throws SQLException, IOException { - byte[] uuid = rs.getBytes("UUID"); - return metadataManagerInstance.getMetricFromUuid(uuid); - } -} - http://git-wip-us.apache.org/repos/asf/ambari/blob/42112e28/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TopNDownSampler.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TopNDownSampler.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TopNDownSampler.java deleted file mode 100644 index d55d026..0000000 --- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TopNDownSampler.java +++ /dev/null @@ -1,112 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators; - -import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.TOPN_DOWNSAMPLER_CLUSTER_METRIC_SELECT_SQL; -import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.TOPN_DOWNSAMPLER_HOST_METRIC_SELECT_SQL; - -import java.util.ArrayList; -import java.util.Arrays; -import java.util.List; -import java.util.Map; - -import org.apache.commons.lang3.StringUtils; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.metrics2.sink.timeline.TopNConfig; -import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.TopNCondition; - -public class TopNDownSampler implements CustomDownSampler { - - private TopNConfig topNConfig; - private static final Log LOG = LogFactory.getLog(TopNDownSampler.class); - protected String metricPatterns; - - public static TopNDownSampler fromConfig(Map<String, String> conf) { - String metricPatterns = conf.get(DownSamplerUtils.downSamplerConfigPrefix + "topn." + - DownSamplerUtils.downSamplerMetricPatternsConfig); - - String topNString = conf.get(DownSamplerUtils.downSamplerConfigPrefix + "topn.value"); - Integer topNValue = topNString != null ? Integer.valueOf(topNString) : 10; - String topNFunction = conf.get(DownSamplerUtils.downSamplerConfigPrefix + "topn.function"); - - return new TopNDownSampler(new TopNConfig(topNValue, topNFunction, false), metricPatterns); - } - - public TopNDownSampler(TopNConfig topNConfig, String metricPatterns) { - this.topNConfig = topNConfig; - this.metricPatterns = metricPatterns; - } - - @Override - public boolean validateConfigs() { - if (topNConfig == null) { - return false; - } - - if (topNConfig.getTopN() <= 0) { - return false; - } - - if (StringUtils.isEmpty(metricPatterns)) { - return false; - } - - return true; - } - - /** - * Prepare downsampling SELECT statement(s) used to determine the data to be written into the Aggregate table. - * @param startTime - * @param endTime - * @param tableName - * @return - */ - @Override - public List<String> prepareDownSamplingStatement(Long startTime, Long endTime, String tableName) { - List<String> stmts = new ArrayList<>(); - - Function.ReadFunction readFunction = Function.ReadFunction.getFunction(topNConfig.getTopNFunction()); - Function function = new Function(readFunction, null); - String columnSelect = TopNCondition.getColumnSelect(function); - - List<String> metricPatternList = Arrays.asList(metricPatterns.split(",")); - - for (String metricPattern : metricPatternList) { - String metricPatternClause = "'" + metricPattern + "'"; - //TODO : Need a better way to find out what kind of aggregation the current one is. - if (tableName.contains("RECORD")) { - stmts.add(String.format(TOPN_DOWNSAMPLER_HOST_METRIC_SELECT_SQL, - endTime, columnSelect, columnSelect, columnSelect, tableName, metricPatternClause, - startTime, endTime, columnSelect, topNConfig.getTopN())); - } else { - stmts.add(String.format(TOPN_DOWNSAMPLER_CLUSTER_METRIC_SELECT_SQL, - endTime, columnSelect, columnSelect, columnSelect, tableName, metricPatternClause, - startTime, endTime, columnSelect, topNConfig.getTopN())); - } - } - - if (LOG.isDebugEnabled()) { - LOG.debug("DownSampling Stmt: " + stmts.toString()); - } - - return stmts; - } - -} http://git-wip-us.apache.org/repos/asf/ambari/blob/42112e28/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/v2/TimelineMetricClusterAggregator.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/v2/TimelineMetricClusterAggregator.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/v2/TimelineMetricClusterAggregator.java deleted file mode 100644 index 06552a6..0000000 --- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/v2/TimelineMetricClusterAggregator.java +++ /dev/null @@ -1,93 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * <p/> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p/> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.v2; - -import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.GET_AGGREGATED_APP_METRIC_GROUPBY_SQL; -import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.METRICS_CLUSTER_AGGREGATE_TABLE_NAME; - -import java.io.IOException; -import java.sql.ResultSet; -import java.sql.SQLException; -import java.util.Date; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixHBaseAccessor; -import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.AbstractTimelineAggregator; -import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.AggregationTaskRunner.AGGREGATOR_NAME; -import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.MetricCollectorHAController; -import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.Condition; -import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.EmptyCondition; - -public class TimelineMetricClusterAggregator extends AbstractTimelineAggregator { - private final String aggregateColumnName; - - public TimelineMetricClusterAggregator(AGGREGATOR_NAME aggregatorName, - PhoenixHBaseAccessor hBaseAccessor, - Configuration metricsConf, - String checkpointLocation, - Long sleepIntervalMillis, - Integer checkpointCutOffMultiplier, - String hostAggregatorDisabledParam, - String inputTableName, - String outputTableName, - Long nativeTimeRangeDelay, - MetricCollectorHAController haController) { - super(aggregatorName, hBaseAccessor, metricsConf, checkpointLocation, - sleepIntervalMillis, checkpointCutOffMultiplier, - hostAggregatorDisabledParam, inputTableName, outputTableName, - nativeTimeRangeDelay, haController); - - if (inputTableName.equals(METRICS_CLUSTER_AGGREGATE_TABLE_NAME)) { - aggregateColumnName = "HOSTS_COUNT"; - } else { - aggregateColumnName = "METRIC_COUNT"; - } - } - - @Override - protected Condition prepareMetricQueryCondition(long startTime, long endTime) { - EmptyCondition condition = new EmptyCondition(); - condition.setDoUpdate(true); - - /* - UPSERT INTO METRIC_AGGREGATE_HOURLY (METRIC_NAME, APP_ID, INSTANCE_ID, - SERVER_TIME, UNITS, METRIC_SUM, METRIC_COUNT, METRIC_MAX, METRIC_MIN) - SELECT METRIC_NAME, APP_ID, INSTANCE_ID, MAX(SERVER_TIME), UNITS, - SUM(METRIC_SUM), SUM(HOSTS_COUNT), MAX(METRIC_MAX), MIN(METRIC_MIN) - FROM METRIC_AGGREGATE WHERE SERVER_TIME >= 1441155600000 AND - SERVER_TIME < 1441159200000 GROUP BY METRIC_NAME, APP_ID, INSTANCE_ID, UNITS; - */ - - condition.setStatement(String.format(GET_AGGREGATED_APP_METRIC_GROUPBY_SQL, - outputTableName, endTime, aggregateColumnName, tableName, - getDownsampledMetricSkipClause(), startTime, endTime)); - - if (LOG.isDebugEnabled()) { - LOG.debug("Condition: " + condition.toString()); - } - - return condition; - } - - @Override - protected void aggregate(ResultSet rs, long startTime, long endTime) throws IOException, SQLException { - LOG.info("Aggregated cluster metrics for " + outputTableName + - ", with startTime = " + new Date(startTime) + - ", endTime = " + new Date(endTime)); - } -} http://git-wip-us.apache.org/repos/asf/ambari/blob/42112e28/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/v2/TimelineMetricFilteringHostAggregator.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/v2/TimelineMetricFilteringHostAggregator.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/v2/TimelineMetricFilteringHostAggregator.java deleted file mode 100644 index a15ab2e..0000000 --- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/v2/TimelineMetricFilteringHostAggregator.java +++ /dev/null @@ -1,119 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.v2; - -import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.GET_AGGREGATED_HOST_METRIC_GROUPBY_SQL; - -import java.util.ArrayList; -import java.util.List; -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; - -import org.apache.commons.collections.CollectionUtils; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixHBaseAccessor; -import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.AggregationTaskRunner; -import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.MetricCollectorHAController; -import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.discovery.TimelineMetricMetadataManager; -import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.Condition; -import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.EmptyCondition; - -public class TimelineMetricFilteringHostAggregator extends TimelineMetricHostAggregator { - private TimelineMetricMetadataManager metricMetadataManager; - private ConcurrentHashMap<String, Long> postedAggregatedMap; - - public TimelineMetricFilteringHostAggregator(AggregationTaskRunner.AGGREGATOR_NAME aggregatorName, - TimelineMetricMetadataManager metricMetadataManager, - PhoenixHBaseAccessor hBaseAccessor, - Configuration metricsConf, - String checkpointLocation, - Long sleepIntervalMillis, - Integer checkpointCutOffMultiplier, - String hostAggregatorDisabledParam, - String tableName, - String outputTableName, - Long nativeTimeRangeDelay, - MetricCollectorHAController haController, - ConcurrentHashMap<String, Long> postedAggregatedMap) { - super(aggregatorName, - hBaseAccessor, metricsConf, - checkpointLocation, - sleepIntervalMillis, - checkpointCutOffMultiplier, - hostAggregatorDisabledParam, - tableName, - outputTableName, - nativeTimeRangeDelay, - haController); - this.metricMetadataManager = metricMetadataManager; - this.postedAggregatedMap = postedAggregatedMap; - } - - @Override - protected Condition prepareMetricQueryCondition(long startTime, long endTime) { - List<String> aggregatedHostnames = new ArrayList<>(); - for (Map.Entry<String, Long> entry : postedAggregatedMap.entrySet()) { - if (entry.getValue() > startTime && entry.getValue() <= endTime) { - aggregatedHostnames.add(entry.getKey()); - } - } - List<String> notAggregatedHostnames = metricMetadataManager.getNotLikeHostnames(aggregatedHostnames); - if (LOG.isDebugEnabled()) { - LOG.debug("Already aggregated hostnames based on postedAggregatedMap : " + aggregatedHostnames); - LOG.debug("Hostnames that will be aggregated : " + notAggregatedHostnames); - } - List<byte[]> uuids = metricMetadataManager.getUuids(new ArrayList<String>(), notAggregatedHostnames, "", ""); - - EmptyCondition condition = new EmptyCondition(); - condition.setDoUpdate(true); - - condition.setStatement(String.format(GET_AGGREGATED_HOST_METRIC_GROUPBY_SQL, - outputTableName, endTime, tableName, - getDownsampledMetricSkipClause() + getIncludedUuidsClause(uuids), startTime, endTime)); - - if (LOG.isDebugEnabled()) { - LOG.debug("Condition: " + condition.toString()); - } - - return condition; - } - - private String getIncludedUuidsClause(List<byte[]> uuids) { - StringBuilder sb = new StringBuilder(); - sb.append("("); - - //LIKE clause - // (UUID LIKE ? OR UUID LIKE ?) AND - if (CollectionUtils.isNotEmpty(uuids)) { - for (int i = 0; i < uuids.size(); i++) { - sb.append("UUID"); - sb.append(" LIKE "); - sb.append("'%"); - sb.append(new String(uuids.get(i))); - sb.append("'"); - - if (i == uuids.size() - 1) { - sb.append(") AND "); - } else { - sb.append(" OR "); - } - } - } - return sb.toString(); - } -} http://git-wip-us.apache.org/repos/asf/ambari/blob/42112e28/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/v2/TimelineMetricHostAggregator.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/v2/TimelineMetricHostAggregator.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/v2/TimelineMetricHostAggregator.java deleted file mode 100644 index 3eb2be3..0000000 --- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/v2/TimelineMetricHostAggregator.java +++ /dev/null @@ -1,76 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.v2; - -import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.GET_AGGREGATED_HOST_METRIC_GROUPBY_SQL; - -import java.io.IOException; -import java.sql.ResultSet; -import java.sql.SQLException; -import java.util.Date; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixHBaseAccessor; -import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.AbstractTimelineAggregator; -import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.AggregationTaskRunner.AGGREGATOR_NAME; -import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.MetricCollectorHAController; -import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.Condition; -import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.EmptyCondition; - -public class TimelineMetricHostAggregator extends AbstractTimelineAggregator { - - public TimelineMetricHostAggregator(AGGREGATOR_NAME aggregatorName, - PhoenixHBaseAccessor hBaseAccessor, - Configuration metricsConf, - String checkpointLocation, - Long sleepIntervalMillis, - Integer checkpointCutOffMultiplier, - String hostAggregatorDisabledParam, - String tableName, - String outputTableName, - Long nativeTimeRangeDelay, - MetricCollectorHAController haController) { - super(aggregatorName, hBaseAccessor, metricsConf, checkpointLocation, - sleepIntervalMillis, checkpointCutOffMultiplier, hostAggregatorDisabledParam, - tableName, outputTableName, nativeTimeRangeDelay, haController); - } - - @Override - protected void aggregate(ResultSet rs, long startTime, long endTime) throws IOException, SQLException { - - LOG.info("Aggregated host metrics for " + outputTableName + - ", with startTime = " + new Date(startTime) + - ", endTime = " + new Date(endTime)); - } - - @Override - protected Condition prepareMetricQueryCondition(long startTime, long endTime) { - EmptyCondition condition = new EmptyCondition(); - condition.setDoUpdate(true); - - condition.setStatement(String.format(GET_AGGREGATED_HOST_METRIC_GROUPBY_SQL, - outputTableName, endTime, tableName, - getDownsampledMetricSkipClause(), startTime, endTime)); - - if (LOG.isDebugEnabled()) { - LOG.debug("Condition: " + condition.toString()); - } - - return condition; - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ambari/blob/42112e28/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/availability/AggregationTaskRunner.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/availability/AggregationTaskRunner.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/availability/AggregationTaskRunner.java deleted file mode 100644 index fef9dc9..0000000 --- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/availability/AggregationTaskRunner.java +++ /dev/null @@ -1,145 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * <p/> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p/> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability; - -import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.TimelineMetricAggregator.AGGREGATOR_TYPE.CLUSTER; -import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.TimelineMetricAggregator.AGGREGATOR_TYPE.HOST; -import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.AggregationTaskRunner.AGGREGATOR_NAME.METRIC_AGGREGATE_DAILY; -import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.AggregationTaskRunner.AGGREGATOR_NAME.METRIC_AGGREGATE_HOURLY; -import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.AggregationTaskRunner.AGGREGATOR_NAME.METRIC_AGGREGATE_MINUTE; -import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.AggregationTaskRunner.AGGREGATOR_NAME.METRIC_AGGREGATE_SECOND; -import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.AggregationTaskRunner.AGGREGATOR_NAME.METRIC_RECORD_DAILY; -import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.AggregationTaskRunner.AGGREGATOR_NAME.METRIC_RECORD_HOURLY; -import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.AggregationTaskRunner.AGGREGATOR_NAME.METRIC_RECORD_MINUTE; -import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.MetricCollectorHAController.DEFAULT_STATE_MODEL; -import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.MetricCollectorHAController.METRIC_AGGREGATORS; - -import java.util.HashMap; -import java.util.Map; -import java.util.concurrent.atomic.AtomicBoolean; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.TimelineMetricAggregator.AGGREGATOR_TYPE; -import org.apache.helix.HelixManager; -import org.apache.helix.HelixManagerFactory; -import org.apache.helix.InstanceType; -import org.apache.helix.participant.StateMachineEngine; - -public class AggregationTaskRunner { - private final String instanceName; - private final String zkAddress; - private final String clusterName; - private HelixManager manager; - private static final Log LOG = LogFactory.getLog(AggregationTaskRunner.class); - private CheckpointManager checkpointManager; - // Map partition name to an aggregator dimension - static final Map<String, AGGREGATOR_TYPE> PARTITION_AGGREGATION_TYPES = new HashMap<>(); - // Ownership flags to be set by the State transitions - private final AtomicBoolean performsClusterAggregation = new AtomicBoolean(false); - private final AtomicBoolean performsHostAggregation = new AtomicBoolean(false); - - public enum AGGREGATOR_NAME { - METRIC_RECORD_MINUTE, - METRIC_RECORD_HOURLY, - METRIC_RECORD_DAILY, - METRIC_AGGREGATE_SECOND, - METRIC_AGGREGATE_MINUTE, - METRIC_AGGREGATE_HOURLY, - METRIC_AGGREGATE_DAILY, - } - - public static final Map<AGGREGATOR_NAME, String> ACTUAL_AGGREGATOR_NAMES = new HashMap<>(); - - static { - ACTUAL_AGGREGATOR_NAMES.put(METRIC_RECORD_MINUTE, "TimelineMetricHostAggregatorMinute"); - ACTUAL_AGGREGATOR_NAMES.put(METRIC_RECORD_HOURLY, "TimelineMetricHostAggregatorHourly"); - ACTUAL_AGGREGATOR_NAMES.put(METRIC_RECORD_DAILY, "TimelineMetricHostAggregatorDaily"); - ACTUAL_AGGREGATOR_NAMES.put(METRIC_AGGREGATE_SECOND, "TimelineClusterAggregatorSecond"); - ACTUAL_AGGREGATOR_NAMES.put(METRIC_AGGREGATE_MINUTE, "TimelineClusterAggregatorMinute"); - ACTUAL_AGGREGATOR_NAMES.put(METRIC_AGGREGATE_HOURLY, "TimelineClusterAggregatorHourly"); - ACTUAL_AGGREGATOR_NAMES.put(METRIC_AGGREGATE_DAILY, "TimelineClusterAggregatorDaily"); - - // Partition name to task assignment - PARTITION_AGGREGATION_TYPES.put(METRIC_AGGREGATORS + "_0", CLUSTER); - PARTITION_AGGREGATION_TYPES.put(METRIC_AGGREGATORS + "_1", HOST); - } - - public AggregationTaskRunner(String instanceName, String zkAddress, String clusterName) { - this.instanceName = instanceName; - this.zkAddress = zkAddress; - this.clusterName = clusterName; - } - - public void initialize() throws Exception { - manager = HelixManagerFactory.getZKHelixManager(clusterName, instanceName, - InstanceType.PARTICIPANT, zkAddress); - - OnlineOfflineStateModelFactory stateModelFactory = - new OnlineOfflineStateModelFactory(instanceName, this); - - StateMachineEngine stateMach = manager.getStateMachineEngine(); - stateMach.registerStateModelFactory(DEFAULT_STATE_MODEL, stateModelFactory); - manager.connect(); - - checkpointManager = new CheckpointManager(manager.getHelixPropertyStore()); - } - - public boolean performsClusterAggregation() { - return performsClusterAggregation.get(); - } - - public boolean performsHostAggregation() { - return performsHostAggregation.get(); - } - - public CheckpointManager getCheckpointManager() { - return checkpointManager; - } - - public void setPartitionAggregationFunction(AGGREGATOR_TYPE type) { - switch (type) { - case HOST: - performsHostAggregation.set(true); - LOG.info("Set host aggregator function for : " + instanceName); - break; - case CLUSTER: - performsClusterAggregation.set(true); - LOG.info("Set cluster aggregator function for : " + instanceName); - } - } - - public void unsetPartitionAggregationFunction(AGGREGATOR_TYPE type) { - switch (type) { - case HOST: - performsHostAggregation.set(false); - LOG.info("Unset host aggregator function for : " + instanceName); - break; - case CLUSTER: - performsClusterAggregation.set(false); - LOG.info("Unset cluster aggregator function for : " + instanceName); - } - } - - /** - * Disconnect participant before controller shutdown - */ - void stop() { - manager.disconnect(); - } -} http://git-wip-us.apache.org/repos/asf/ambari/blob/42112e28/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/availability/CheckpointManager.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/availability/CheckpointManager.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/availability/CheckpointManager.java deleted file mode 100644 index 3293ead..0000000 --- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/availability/CheckpointManager.java +++ /dev/null @@ -1,98 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * <p/> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p/> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability; - -import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.AggregationTaskRunner.ACTUAL_AGGREGATOR_NAMES; - -import org.I0Itec.zkclient.DataUpdater; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.AggregationTaskRunner.AGGREGATOR_NAME; -import org.apache.helix.AccessOption; -import org.apache.helix.ZNRecord; -import org.apache.helix.store.zk.ZkHelixPropertyStore; -import org.apache.zookeeper.data.Stat; - -public class CheckpointManager { - private final ZkHelixPropertyStore<ZNRecord> propertyStore; - private static final Log LOG = LogFactory.getLog(CheckpointManager.class); - - static final String ZNODE_FIELD = "checkpoint"; - static final String CHECKPOINT_PATH_PREFIX = "CHECKPOINTS"; - - public CheckpointManager(ZkHelixPropertyStore<ZNRecord> propertyStore) { - this.propertyStore = propertyStore; - } - - /** - * Read aggregator checkpoint from zookeeper - * - * @return timestamp - */ - public long readCheckpoint(AGGREGATOR_NAME aggregatorName) { - String path = getCheckpointZKPath(aggregatorName); - LOG.debug("Reading checkpoint at " + path); - Stat stat = new Stat(); - ZNRecord znRecord = propertyStore.get(path, stat, AccessOption.PERSISTENT); - if (LOG.isTraceEnabled()) { - LOG.trace("Stat => " + stat); - } - long checkpoint = znRecord != null ? znRecord.getLongField(ZNODE_FIELD, -1) : -1; - LOG.debug("Checkpoint value = " + checkpoint); - return checkpoint; - } - - /** - * Write aggregator checkpoint in zookeeper - * - * @param value timestamp - * @return sucsess - */ - public boolean writeCheckpoint(AGGREGATOR_NAME aggregatorName, long value) { - String path = getCheckpointZKPath(aggregatorName); - LOG.debug(String.format("Saving checkpoint at %s with value %s", path, value)); - return propertyStore.update(path, new CheckpointDataUpdater(path, value), AccessOption.PERSISTENT); - } - - static class CheckpointDataUpdater implements DataUpdater<ZNRecord> { - final String path; - final Long value; - - public CheckpointDataUpdater(String path, Long value) { - this.path = path; - this.value = value; - } - - @Override - public ZNRecord update(ZNRecord currentData) { - if (currentData == null) { - currentData = new ZNRecord(path); - } - currentData.setLongField(ZNODE_FIELD, value); - return currentData; - } - } - - String getCheckpointZKPath(AGGREGATOR_NAME aggregatorName) { - StringBuilder sb = new StringBuilder("/"); - sb.append(CHECKPOINT_PATH_PREFIX); - sb.append("/"); - sb.append(ACTUAL_AGGREGATOR_NAMES.get(aggregatorName)); - return sb.toString(); - } -}
