http://git-wip-us.apache.org/repos/asf/ambari/blob/42112e28/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/aggregators/TimelineMetricClusterAggregatorSecondWithCacheSource.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/aggregators/TimelineMetricClusterAggregatorSecondWithCacheSource.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/aggregators/TimelineMetricClusterAggregatorSecondWithCacheSource.java new file mode 100644 index 0000000..69122f9 --- /dev/null +++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/aggregators/TimelineMetricClusterAggregatorSecondWithCacheSource.java @@ -0,0 +1,104 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ambari.metrics.core.timeline.aggregators; + +import static org.apache.ambari.metrics.core.timeline.aggregators.AggregatorUtils.getTimeSlices; + +import java.util.Date; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.apache.ambari.metrics.core.timeline.TimelineMetricDistributedCache; +import org.apache.ambari.metrics.core.timeline.availability.MetricCollectorHAController; +import org.apache.ambari.metrics.core.timeline.discovery.TimelineMetricMetadataManager; +import org.apache.commons.lang.mutable.MutableInt; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.metrics2.sink.timeline.MetricClusterAggregate; +import org.apache.ambari.metrics.core.timeline.PhoenixHBaseAccessor; +import org.apache.ambari.metrics.core.timeline.availability.AggregationTaskRunner; + +public class TimelineMetricClusterAggregatorSecondWithCacheSource extends TimelineMetricClusterAggregatorSecond { + private TimelineMetricDistributedCache distributedCache; + public TimelineMetricClusterAggregatorSecondWithCacheSource(AggregationTaskRunner.AGGREGATOR_NAME metricAggregateSecond, TimelineMetricMetadataManager metricMetadataManager, PhoenixHBaseAccessor hBaseAccessor, Configuration metricsConf, String checkpointLocation, long sleepIntervalMillis, int checkpointCutOffMultiplier, String aggregatorDisabledParam, String inputTableName, String outputTableName, + Long nativeTimeRangeDelay, + Long timeSliceInterval, + MetricCollectorHAController haController, TimelineMetricDistributedCache distributedCache) { + super(metricAggregateSecond, metricMetadataManager, hBaseAccessor, metricsConf, checkpointLocation, sleepIntervalMillis, checkpointCutOffMultiplier, aggregatorDisabledParam, inputTableName, outputTableName, nativeTimeRangeDelay, timeSliceInterval, haController); + this.distributedCache = distributedCache; + } + + @Override + public boolean doWork(long startTime, long endTime) { + LOG.info("Start aggregation cycle @ " + new Date() + ", " + + "startTime = " + new Date(startTime) + ", endTime = " + new Date(endTime)); + try { + Map<String, Double> caheMetrics; + if (LOG.isDebugEnabled()) { + caheMetrics = distributedCache.getPointInTimeCacheMetrics(); + LOG.debug("Ignite metrics before eviction : " + caheMetrics); + } + + LOG.info("Trying to evict elements from cache"); + Map<TimelineClusterMetric, MetricClusterAggregate> metricsFromCache = distributedCache.evictMetricAggregates(startTime - serverTimeShiftAdjustment, endTime - serverTimeShiftAdjustment); + LOG.info(String.format("Evicted %s elements from cache.", metricsFromCache.size())); + + if (LOG.isDebugEnabled()) { + caheMetrics = distributedCache.getPointInTimeCacheMetrics(); + LOG.debug("Ignite metrics after eviction : " + caheMetrics); + } + + List<Long[]> timeSlices = getTimeSlices(startTime - serverTimeShiftAdjustment, endTime - serverTimeShiftAdjustment, timeSliceIntervalMillis); + Map<TimelineClusterMetric, MetricClusterAggregate> result = aggregateMetricsFromMetricClusterAggregates(metricsFromCache, timeSlices); + + LOG.info("Saving " + result.size() + " metric aggregates."); + hBaseAccessor.saveClusterAggregateRecords(result); + LOG.info("End aggregation cycle @ " + new Date()); + return true; + } catch (Exception e) { + LOG.error("Exception during aggregation. ", e); + return false; + } + } + + //Slices in cache could be different from aggregate slices, so need to recalculate. Counts hosted apps + Map<TimelineClusterMetric, MetricClusterAggregate> aggregateMetricsFromMetricClusterAggregates(Map<TimelineClusterMetric, MetricClusterAggregate> metricsFromCache, List<Long[]> timeSlices) { + //TODO add basic interpolation + //TODO investigate if needed, maybe add config to disable/enable + //count hosted apps + Map<String, MutableInt> hostedAppCounter = new HashMap<>(); + for (Map.Entry<TimelineClusterMetric, MetricClusterAggregate> clusterMetricAggregateEntry : metricsFromCache.entrySet()) { + int numHosts = clusterMetricAggregateEntry.getValue().getNumberOfHosts(); + String appId = clusterMetricAggregateEntry.getKey().getAppId(); + if (!hostedAppCounter.containsKey(appId)) { + hostedAppCounter.put(appId, new MutableInt(numHosts)); + } else { + int currentHostCount = hostedAppCounter.get(appId).intValue(); + if (currentHostCount < numHosts) { + hostedAppCounter.put(appId, new MutableInt(numHosts)); + } + } + } + + // Add liveHosts per AppId metrics. + processLiveAppCountMetrics(metricsFromCache, hostedAppCounter, timeSlices.get(timeSlices.size() - 1)[1]); + + return metricsFromCache; + } + +} \ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ambari/blob/42112e28/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/aggregators/TimelineMetricFilteringHostAggregator.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/aggregators/TimelineMetricFilteringHostAggregator.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/aggregators/TimelineMetricFilteringHostAggregator.java new file mode 100644 index 0000000..371d9fa --- /dev/null +++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/aggregators/TimelineMetricFilteringHostAggregator.java @@ -0,0 +1,93 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ambari.metrics.core.timeline.aggregators; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +import org.apache.ambari.metrics.core.timeline.availability.AggregationTaskRunner; +import org.apache.ambari.metrics.core.timeline.availability.MetricCollectorHAController; +import org.apache.ambari.metrics.core.timeline.discovery.TimelineMetricMetadataManager; +import org.apache.ambari.metrics.core.timeline.query.Condition; +import org.apache.ambari.metrics.core.timeline.query.DefaultCondition; +import org.apache.ambari.metrics.core.timeline.query.PhoenixTransactSQL; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.ambari.metrics.core.timeline.PhoenixHBaseAccessor; + +public class TimelineMetricFilteringHostAggregator extends TimelineMetricHostAggregator { + private static final Log LOG = LogFactory.getLog(TimelineMetricFilteringHostAggregator.class); + private TimelineMetricMetadataManager metricMetadataManager; + private ConcurrentHashMap<String, Long> postedAggregatedMap; + + public TimelineMetricFilteringHostAggregator(AggregationTaskRunner.AGGREGATOR_NAME aggregatorName, + TimelineMetricMetadataManager metricMetadataManager, + PhoenixHBaseAccessor hBaseAccessor, + Configuration metricsConf, + String checkpointLocation, + Long sleepIntervalMillis, + Integer checkpointCutOffMultiplier, + String hostAggregatorDisabledParam, + String tableName, + String outputTableName, + Long nativeTimeRangeDelay, + MetricCollectorHAController haController, + ConcurrentHashMap<String, Long> postedAggregatedMap) { + super(aggregatorName, metricMetadataManager, + hBaseAccessor, metricsConf, + checkpointLocation, + sleepIntervalMillis, + checkpointCutOffMultiplier, + hostAggregatorDisabledParam, + tableName, + outputTableName, + nativeTimeRangeDelay, + haController); + this.metricMetadataManager = metricMetadataManager; + this.postedAggregatedMap = postedAggregatedMap; + } + + @Override + protected Condition prepareMetricQueryCondition(long startTime, long endTime) { + List<String> aggregatedHostnames = new ArrayList<>(); + for (Map.Entry<String, Long> entry : postedAggregatedMap.entrySet()) { + if (entry.getValue() > startTime && entry.getValue() <= endTime) { + aggregatedHostnames.add(entry.getKey()); + } + } + List<String> notAggregatedHostnames = metricMetadataManager.getNotLikeHostnames(aggregatedHostnames); + if (LOG.isDebugEnabled()) { + LOG.debug("Already aggregated hostnames based on postedAggregatedMap : " + aggregatedHostnames); + LOG.debug("Hostnames that will be aggregated : " + notAggregatedHostnames); + } + List<byte[]> uuids = metricMetadataManager.getUuids(new ArrayList<String>(), notAggregatedHostnames, "", ""); + + Condition condition = new DefaultCondition(uuids, null, null, null, null, startTime, + endTime, null, null, true); + condition.setNoLimit(); + condition.setFetchSize(resultsetFetchSize); + condition.setStatement(String.format(PhoenixTransactSQL.GET_METRIC_AGGREGATE_ONLY_SQL, tableName)); + // Retaining order of the row-key avoids client side merge sort. + condition.addOrderByColumn("UUID"); + condition.addOrderByColumn("SERVER_TIME"); + return condition; + } +} http://git-wip-us.apache.org/repos/asf/ambari/blob/42112e28/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/aggregators/TimelineMetricHostAggregator.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/aggregators/TimelineMetricHostAggregator.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/aggregators/TimelineMetricHostAggregator.java new file mode 100644 index 0000000..c25d6ce --- /dev/null +++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/aggregators/TimelineMetricHostAggregator.java @@ -0,0 +1,120 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ambari.metrics.core.timeline.aggregators; + +import java.io.IOException; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.util.HashMap; +import java.util.Map; + +import org.apache.ambari.metrics.core.timeline.availability.MetricCollectorHAController; +import org.apache.ambari.metrics.core.timeline.discovery.TimelineMetricMetadataManager; +import org.apache.ambari.metrics.core.timeline.query.Condition; +import org.apache.ambari.metrics.core.timeline.query.DefaultCondition; +import org.apache.ambari.metrics.core.timeline.query.PhoenixTransactSQL; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.metrics2.sink.timeline.MetricHostAggregate; +import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric; +import org.apache.ambari.metrics.core.timeline.PhoenixHBaseAccessor; +import org.apache.ambari.metrics.core.timeline.availability.AggregationTaskRunner.AGGREGATOR_NAME; + +public class TimelineMetricHostAggregator extends AbstractTimelineAggregator { + private static final Log LOG = LogFactory.getLog(TimelineMetricHostAggregator.class); + TimelineMetricReadHelper readHelper; + + public TimelineMetricHostAggregator(AGGREGATOR_NAME aggregatorName, + TimelineMetricMetadataManager metricMetadataManager, + PhoenixHBaseAccessor hBaseAccessor, + Configuration metricsConf, + String checkpointLocation, + Long sleepIntervalMillis, + Integer checkpointCutOffMultiplier, + String hostAggregatorDisabledParam, + String tableName, + String outputTableName, + Long nativeTimeRangeDelay, + MetricCollectorHAController haController) { + super(aggregatorName, hBaseAccessor, metricsConf, checkpointLocation, + sleepIntervalMillis, checkpointCutOffMultiplier, hostAggregatorDisabledParam, + tableName, outputTableName, nativeTimeRangeDelay, haController); + readHelper = new TimelineMetricReadHelper(metricMetadataManager, false); + } + + @Override + protected void aggregate(ResultSet rs, long startTime, long endTime) throws IOException, SQLException { + + Map<TimelineMetric, MetricHostAggregate> hostAggregateMap = aggregateMetricsFromResultSet(rs, endTime); + + LOG.info("Saving " + hostAggregateMap.size() + " metric aggregates."); + hBaseAccessor.saveHostAggregateRecords(hostAggregateMap, outputTableName); + } + + @Override + protected Condition prepareMetricQueryCondition(long startTime, long endTime) { + Condition condition = new DefaultCondition(null, null, null, null, startTime, + endTime, null, null, true); + condition.setNoLimit(); + condition.setFetchSize(resultsetFetchSize); + condition.setStatement(String.format(PhoenixTransactSQL.GET_METRIC_AGGREGATE_ONLY_SQL, tableName)); + // Retaining order of the row-key avoids client side merge sort. + condition.addOrderByColumn("UUID"); + condition.addOrderByColumn("SERVER_TIME"); + return condition; + } + + private Map<TimelineMetric, MetricHostAggregate> aggregateMetricsFromResultSet(ResultSet rs, long endTime) + throws IOException, SQLException { + TimelineMetric existingMetric = null; + MetricHostAggregate hostAggregate = null; + Map<TimelineMetric, MetricHostAggregate> hostAggregateMap = new HashMap<TimelineMetric, MetricHostAggregate>(); + + + while (rs.next()) { + TimelineMetric currentMetric = + readHelper.getTimelineMetricKeyFromResultSet(rs); + MetricHostAggregate currentHostAggregate = + readHelper.getMetricHostAggregateFromResultSet(rs); + + if (existingMetric == null) { + // First row + existingMetric = currentMetric; + currentMetric.setStartTime(endTime); + hostAggregate = new MetricHostAggregate(); + hostAggregateMap.put(currentMetric, hostAggregate); + } + + if (existingMetric.equalsExceptTime(currentMetric)) { + // Recalculate totals with current metric + hostAggregate.updateAggregates(currentHostAggregate); + } else { + // Switched over to a new metric - save existing - create new aggregate + currentMetric.setStartTime(endTime); + hostAggregate = new MetricHostAggregate(); + hostAggregate.updateAggregates(currentHostAggregate); + hostAggregateMap.put(currentMetric, hostAggregate); + existingMetric = currentMetric; + } + } + return hostAggregateMap; + } + + +} http://git-wip-us.apache.org/repos/asf/ambari/blob/42112e28/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/aggregators/TimelineMetricReadHelper.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/aggregators/TimelineMetricReadHelper.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/aggregators/TimelineMetricReadHelper.java new file mode 100644 index 0000000..a56c7aa --- /dev/null +++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/aggregators/TimelineMetricReadHelper.java @@ -0,0 +1,170 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ambari.metrics.core.timeline.aggregators; + + +import java.io.IOException; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.util.TreeMap; + +import org.apache.hadoop.metrics2.sink.timeline.MetricClusterAggregate; +import org.apache.hadoop.metrics2.sink.timeline.MetricHostAggregate; +import org.apache.hadoop.metrics2.sink.timeline.SingleValuedTimelineMetric; +import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric; +import org.apache.ambari.metrics.core.timeline.PhoenixHBaseAccessor; +import org.apache.ambari.metrics.core.timeline.discovery.TimelineMetricMetadataManager; + +public class TimelineMetricReadHelper { + + private boolean ignoreInstance = false; + private TimelineMetricMetadataManager metadataManagerInstance = null; + + public TimelineMetricReadHelper() {} + + public TimelineMetricReadHelper(boolean ignoreInstance) { + this.ignoreInstance = ignoreInstance; + } + + public TimelineMetricReadHelper(TimelineMetricMetadataManager timelineMetricMetadataManager) { + this.metadataManagerInstance = timelineMetricMetadataManager; + } + + public TimelineMetricReadHelper(TimelineMetricMetadataManager timelineMetricMetadataManager, boolean ignoreInstance) { + this.metadataManagerInstance = timelineMetricMetadataManager; + this.ignoreInstance = ignoreInstance; + } + + public TimelineMetric getTimelineMetricFromResultSet(ResultSet rs) + throws SQLException, IOException { + TimelineMetric metric = getTimelineMetricCommonsFromResultSet(rs); + TreeMap<Long, Double> sortedByTimeMetrics = PhoenixHBaseAccessor.readMetricFromJSON(rs.getString("METRICS")); + metric.setMetricValues(sortedByTimeMetrics); + return metric; + } + + public SingleValuedTimelineMetric getAggregatedTimelineMetricFromResultSet(ResultSet rs, + Function f) throws SQLException, IOException { + + byte[] uuid = rs.getBytes("UUID"); + TimelineMetric timelineMetric = metadataManagerInstance.getMetricFromUuid(uuid); + Function function = (f != null) ? f : Function.DEFAULT_VALUE_FUNCTION; + SingleValuedTimelineMetric metric = new SingleValuedTimelineMetric( + timelineMetric.getMetricName() + function.getSuffix(), + timelineMetric.getAppId(), + timelineMetric.getInstanceId(), + timelineMetric.getHostName(), + rs.getLong("SERVER_TIME") + ); + + double value; + switch(function.getReadFunction()){ + case AVG: + value = rs.getDouble("METRIC_SUM") / rs.getInt("METRIC_COUNT"); + break; + case MIN: + value = rs.getDouble("METRIC_MIN"); + break; + case MAX: + value = rs.getDouble("METRIC_MAX"); + break; + case SUM: + value = rs.getDouble("METRIC_SUM") / rs.getInt("METRIC_COUNT"); + break; + default: + value = rs.getDouble("METRIC_SUM") / rs.getInt("METRIC_COUNT"); + break; + } + + metric.setSingleTimeseriesValue(rs.getLong("SERVER_TIME"), value); + + return metric; + } + + /** + * Returns common part of timeline metrics record without the values. + */ + public TimelineMetric getTimelineMetricCommonsFromResultSet(ResultSet rs) + throws SQLException { + + byte[] uuid = rs.getBytes("UUID"); + TimelineMetric metric = metadataManagerInstance.getMetricFromUuid(uuid); + if (ignoreInstance) { + metric.setInstanceId(null); + } + metric.setStartTime(rs.getLong("SERVER_TIME")); + return metric; + } + + public MetricClusterAggregate getMetricClusterAggregateFromResultSet(ResultSet rs) + throws SQLException { + MetricClusterAggregate agg = new MetricClusterAggregate(); + agg.setSum(rs.getDouble("METRIC_SUM")); + agg.setMax(rs.getDouble("METRIC_MAX")); + agg.setMin(rs.getDouble("METRIC_MIN")); + agg.setNumberOfHosts(rs.getInt("HOSTS_COUNT")); + + agg.setDeviation(0.0); + + return agg; + } + + public MetricClusterAggregate getMetricClusterTimeAggregateFromResultSet(ResultSet rs) + throws SQLException { + MetricClusterAggregate agg = new MetricClusterAggregate(); + agg.setSum(rs.getDouble("METRIC_SUM")); + agg.setMax(rs.getDouble("METRIC_MAX")); + agg.setMin(rs.getDouble("METRIC_MIN")); + agg.setNumberOfHosts(rs.getInt("METRIC_COUNT")); + + agg.setDeviation(0.0); + + return agg; + } + + public TimelineClusterMetric fromResultSet(ResultSet rs) throws SQLException { + + byte[] uuid = rs.getBytes("UUID"); + TimelineMetric timelineMetric = metadataManagerInstance.getMetricFromUuid(uuid); + + return new TimelineClusterMetric( + timelineMetric.getMetricName(), + timelineMetric.getAppId(), + ignoreInstance ? null : timelineMetric.getInstanceId(), + rs.getLong("SERVER_TIME")); + } + + public MetricHostAggregate getMetricHostAggregateFromResultSet(ResultSet rs) + throws SQLException { + MetricHostAggregate metricHostAggregate = new MetricHostAggregate(); + metricHostAggregate.setSum(rs.getDouble("METRIC_SUM")); + metricHostAggregate.setMax(rs.getDouble("METRIC_MAX")); + metricHostAggregate.setMin(rs.getDouble("METRIC_MIN")); + metricHostAggregate.setNumberOfSamples(rs.getLong("METRIC_COUNT")); + + metricHostAggregate.setDeviation(0.0); + return metricHostAggregate; + } + + public TimelineMetric getTimelineMetricKeyFromResultSet(ResultSet rs) + throws SQLException, IOException { + byte[] uuid = rs.getBytes("UUID"); + return metadataManagerInstance.getMetricFromUuid(uuid); + } +} + http://git-wip-us.apache.org/repos/asf/ambari/blob/42112e28/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/aggregators/TopNDownSampler.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/aggregators/TopNDownSampler.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/aggregators/TopNDownSampler.java new file mode 100644 index 0000000..e28d465 --- /dev/null +++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/aggregators/TopNDownSampler.java @@ -0,0 +1,112 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ambari.metrics.core.timeline.aggregators; + +import static org.apache.ambari.metrics.core.timeline.query.PhoenixTransactSQL.TOPN_DOWNSAMPLER_CLUSTER_METRIC_SELECT_SQL; +import static org.apache.ambari.metrics.core.timeline.query.PhoenixTransactSQL.TOPN_DOWNSAMPLER_HOST_METRIC_SELECT_SQL; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Map; + +import org.apache.ambari.metrics.core.timeline.query.TopNCondition; +import org.apache.commons.lang3.StringUtils; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.metrics2.sink.timeline.TopNConfig; + +public class TopNDownSampler implements CustomDownSampler { + + private TopNConfig topNConfig; + private static final Log LOG = LogFactory.getLog(TopNDownSampler.class); + protected String metricPatterns; + + public static TopNDownSampler fromConfig(Map<String, String> conf) { + String metricPatterns = conf.get(DownSamplerUtils.downSamplerConfigPrefix + "topn." + + DownSamplerUtils.downSamplerMetricPatternsConfig); + + String topNString = conf.get(DownSamplerUtils.downSamplerConfigPrefix + "topn.value"); + Integer topNValue = topNString != null ? Integer.valueOf(topNString) : 10; + String topNFunction = conf.get(DownSamplerUtils.downSamplerConfigPrefix + "topn.function"); + + return new TopNDownSampler(new TopNConfig(topNValue, topNFunction, false), metricPatterns); + } + + public TopNDownSampler(TopNConfig topNConfig, String metricPatterns) { + this.topNConfig = topNConfig; + this.metricPatterns = metricPatterns; + } + + @Override + public boolean validateConfigs() { + if (topNConfig == null) { + return false; + } + + if (topNConfig.getTopN() <= 0) { + return false; + } + + if (StringUtils.isEmpty(metricPatterns)) { + return false; + } + + return true; + } + + /** + * Prepare downsampling SELECT statement(s) used to determine the data to be written into the Aggregate table. + * @param startTime + * @param endTime + * @param tableName + * @return + */ + @Override + public List<String> prepareDownSamplingStatement(Long startTime, Long endTime, String tableName) { + List<String> stmts = new ArrayList<>(); + + Function.ReadFunction readFunction = Function.ReadFunction.getFunction(topNConfig.getTopNFunction()); + Function function = new Function(readFunction, null); + String columnSelect = TopNCondition.getColumnSelect(function); + + List<String> metricPatternList = Arrays.asList(metricPatterns.split(",")); + + for (String metricPattern : metricPatternList) { + String metricPatternClause = "'" + metricPattern + "'"; + //TODO : Need a better way to find out what kind of aggregation the current one is. + if (tableName.contains("RECORD")) { + stmts.add(String.format(TOPN_DOWNSAMPLER_HOST_METRIC_SELECT_SQL, + endTime, columnSelect, columnSelect, columnSelect, tableName, metricPatternClause, + startTime, endTime, columnSelect, topNConfig.getTopN())); + } else { + stmts.add(String.format(TOPN_DOWNSAMPLER_CLUSTER_METRIC_SELECT_SQL, + endTime, columnSelect, columnSelect, columnSelect, tableName, metricPatternClause, + startTime, endTime, columnSelect, topNConfig.getTopN())); + } + } + + if (LOG.isDebugEnabled()) { + LOG.debug("DownSampling Stmt: " + stmts.toString()); + } + + return stmts; + } + +} http://git-wip-us.apache.org/repos/asf/ambari/blob/42112e28/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/aggregators/v2/TimelineMetricClusterAggregator.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/aggregators/v2/TimelineMetricClusterAggregator.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/aggregators/v2/TimelineMetricClusterAggregator.java new file mode 100644 index 0000000..9c255e7 --- /dev/null +++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/aggregators/v2/TimelineMetricClusterAggregator.java @@ -0,0 +1,93 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ambari.metrics.core.timeline.aggregators.v2; + +import static org.apache.ambari.metrics.core.timeline.query.PhoenixTransactSQL.GET_AGGREGATED_APP_METRIC_GROUPBY_SQL; +import static org.apache.ambari.metrics.core.timeline.query.PhoenixTransactSQL.METRICS_CLUSTER_AGGREGATE_TABLE_NAME; + +import java.io.IOException; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.util.Date; + +import org.apache.ambari.metrics.core.timeline.availability.MetricCollectorHAController; +import org.apache.ambari.metrics.core.timeline.query.EmptyCondition; +import org.apache.hadoop.conf.Configuration; +import org.apache.ambari.metrics.core.timeline.PhoenixHBaseAccessor; +import org.apache.ambari.metrics.core.timeline.aggregators.AbstractTimelineAggregator; +import org.apache.ambari.metrics.core.timeline.availability.AggregationTaskRunner.AGGREGATOR_NAME; +import org.apache.ambari.metrics.core.timeline.query.Condition; + +public class TimelineMetricClusterAggregator extends AbstractTimelineAggregator { + private final String aggregateColumnName; + + public TimelineMetricClusterAggregator(AGGREGATOR_NAME aggregatorName, + PhoenixHBaseAccessor hBaseAccessor, + Configuration metricsConf, + String checkpointLocation, + Long sleepIntervalMillis, + Integer checkpointCutOffMultiplier, + String hostAggregatorDisabledParam, + String inputTableName, + String outputTableName, + Long nativeTimeRangeDelay, + MetricCollectorHAController haController) { + super(aggregatorName, hBaseAccessor, metricsConf, checkpointLocation, + sleepIntervalMillis, checkpointCutOffMultiplier, + hostAggregatorDisabledParam, inputTableName, outputTableName, + nativeTimeRangeDelay, haController); + + if (inputTableName.equals(METRICS_CLUSTER_AGGREGATE_TABLE_NAME)) { + aggregateColumnName = "HOSTS_COUNT"; + } else { + aggregateColumnName = "METRIC_COUNT"; + } + } + + @Override + protected Condition prepareMetricQueryCondition(long startTime, long endTime) { + EmptyCondition condition = new EmptyCondition(); + condition.setDoUpdate(true); + + /* + UPSERT INTO METRIC_AGGREGATE_HOURLY (METRIC_NAME, APP_ID, INSTANCE_ID, + SERVER_TIME, UNITS, METRIC_SUM, METRIC_COUNT, METRIC_MAX, METRIC_MIN) + SELECT METRIC_NAME, APP_ID, INSTANCE_ID, MAX(SERVER_TIME), UNITS, + SUM(METRIC_SUM), SUM(HOSTS_COUNT), MAX(METRIC_MAX), MIN(METRIC_MIN) + FROM METRIC_AGGREGATE WHERE SERVER_TIME >= 1441155600000 AND + SERVER_TIME < 1441159200000 GROUP BY METRIC_NAME, APP_ID, INSTANCE_ID, UNITS; + */ + + condition.setStatement(String.format(GET_AGGREGATED_APP_METRIC_GROUPBY_SQL, + outputTableName, endTime, aggregateColumnName, tableName, + getDownsampledMetricSkipClause(), startTime, endTime)); + + if (LOG.isDebugEnabled()) { + LOG.debug("Condition: " + condition.toString()); + } + + return condition; + } + + @Override + protected void aggregate(ResultSet rs, long startTime, long endTime) throws IOException, SQLException { + LOG.info("Aggregated cluster metrics for " + outputTableName + + ", with startTime = " + new Date(startTime) + + ", endTime = " + new Date(endTime)); + } +} http://git-wip-us.apache.org/repos/asf/ambari/blob/42112e28/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/aggregators/v2/TimelineMetricFilteringHostAggregator.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/aggregators/v2/TimelineMetricFilteringHostAggregator.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/aggregators/v2/TimelineMetricFilteringHostAggregator.java new file mode 100644 index 0000000..1026cbe --- /dev/null +++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/aggregators/v2/TimelineMetricFilteringHostAggregator.java @@ -0,0 +1,119 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ambari.metrics.core.timeline.aggregators.v2; + +import static org.apache.ambari.metrics.core.timeline.query.PhoenixTransactSQL.GET_AGGREGATED_HOST_METRIC_GROUPBY_SQL; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +import org.apache.ambari.metrics.core.timeline.availability.MetricCollectorHAController; +import org.apache.ambari.metrics.core.timeline.query.EmptyCondition; +import org.apache.commons.collections.CollectionUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.ambari.metrics.core.timeline.PhoenixHBaseAccessor; +import org.apache.ambari.metrics.core.timeline.availability.AggregationTaskRunner; +import org.apache.ambari.metrics.core.timeline.discovery.TimelineMetricMetadataManager; +import org.apache.ambari.metrics.core.timeline.query.Condition; + +public class TimelineMetricFilteringHostAggregator extends TimelineMetricHostAggregator { + private TimelineMetricMetadataManager metricMetadataManager; + private ConcurrentHashMap<String, Long> postedAggregatedMap; + + public TimelineMetricFilteringHostAggregator(AggregationTaskRunner.AGGREGATOR_NAME aggregatorName, + TimelineMetricMetadataManager metricMetadataManager, + PhoenixHBaseAccessor hBaseAccessor, + Configuration metricsConf, + String checkpointLocation, + Long sleepIntervalMillis, + Integer checkpointCutOffMultiplier, + String hostAggregatorDisabledParam, + String tableName, + String outputTableName, + Long nativeTimeRangeDelay, + MetricCollectorHAController haController, + ConcurrentHashMap<String, Long> postedAggregatedMap) { + super(aggregatorName, + hBaseAccessor, metricsConf, + checkpointLocation, + sleepIntervalMillis, + checkpointCutOffMultiplier, + hostAggregatorDisabledParam, + tableName, + outputTableName, + nativeTimeRangeDelay, + haController); + this.metricMetadataManager = metricMetadataManager; + this.postedAggregatedMap = postedAggregatedMap; + } + + @Override + protected Condition prepareMetricQueryCondition(long startTime, long endTime) { + List<String> aggregatedHostnames = new ArrayList<>(); + for (Map.Entry<String, Long> entry : postedAggregatedMap.entrySet()) { + if (entry.getValue() > startTime && entry.getValue() <= endTime) { + aggregatedHostnames.add(entry.getKey()); + } + } + List<String> notAggregatedHostnames = metricMetadataManager.getNotLikeHostnames(aggregatedHostnames); + if (LOG.isDebugEnabled()) { + LOG.debug("Already aggregated hostnames based on postedAggregatedMap : " + aggregatedHostnames); + LOG.debug("Hostnames that will be aggregated : " + notAggregatedHostnames); + } + List<byte[]> uuids = metricMetadataManager.getUuids(new ArrayList<String>(), notAggregatedHostnames, "", ""); + + EmptyCondition condition = new EmptyCondition(); + condition.setDoUpdate(true); + + condition.setStatement(String.format(GET_AGGREGATED_HOST_METRIC_GROUPBY_SQL, + outputTableName, endTime, tableName, + getDownsampledMetricSkipClause() + getIncludedUuidsClause(uuids), startTime, endTime)); + + if (LOG.isDebugEnabled()) { + LOG.debug("Condition: " + condition.toString()); + } + + return condition; + } + + private String getIncludedUuidsClause(List<byte[]> uuids) { + StringBuilder sb = new StringBuilder(); + sb.append("("); + + //LIKE clause + // (UUID LIKE ? OR UUID LIKE ?) AND + if (CollectionUtils.isNotEmpty(uuids)) { + for (int i = 0; i < uuids.size(); i++) { + sb.append("UUID"); + sb.append(" LIKE "); + sb.append("'%"); + sb.append(new String(uuids.get(i))); + sb.append("'"); + + if (i == uuids.size() - 1) { + sb.append(") AND "); + } else { + sb.append(" OR "); + } + } + } + return sb.toString(); + } +} http://git-wip-us.apache.org/repos/asf/ambari/blob/42112e28/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/aggregators/v2/TimelineMetricHostAggregator.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/aggregators/v2/TimelineMetricHostAggregator.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/aggregators/v2/TimelineMetricHostAggregator.java new file mode 100644 index 0000000..9e8df6d --- /dev/null +++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/aggregators/v2/TimelineMetricHostAggregator.java @@ -0,0 +1,75 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ambari.metrics.core.timeline.aggregators.v2; + +import java.io.IOException; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.util.Date; + +import org.apache.ambari.metrics.core.timeline.aggregators.AbstractTimelineAggregator; +import org.apache.ambari.metrics.core.timeline.availability.AggregationTaskRunner; +import org.apache.ambari.metrics.core.timeline.availability.MetricCollectorHAController; +import org.apache.ambari.metrics.core.timeline.query.Condition; +import org.apache.ambari.metrics.core.timeline.query.EmptyCondition; +import org.apache.ambari.metrics.core.timeline.query.PhoenixTransactSQL; +import org.apache.hadoop.conf.Configuration; +import org.apache.ambari.metrics.core.timeline.PhoenixHBaseAccessor; + +public class TimelineMetricHostAggregator extends AbstractTimelineAggregator { + + public TimelineMetricHostAggregator(AggregationTaskRunner.AGGREGATOR_NAME aggregatorName, + PhoenixHBaseAccessor hBaseAccessor, + Configuration metricsConf, + String checkpointLocation, + Long sleepIntervalMillis, + Integer checkpointCutOffMultiplier, + String hostAggregatorDisabledParam, + String tableName, + String outputTableName, + Long nativeTimeRangeDelay, + MetricCollectorHAController haController) { + super(aggregatorName, hBaseAccessor, metricsConf, checkpointLocation, + sleepIntervalMillis, checkpointCutOffMultiplier, hostAggregatorDisabledParam, + tableName, outputTableName, nativeTimeRangeDelay, haController); + } + + @Override + protected void aggregate(ResultSet rs, long startTime, long endTime) throws IOException, SQLException { + + LOG.info("Aggregated host metrics for " + outputTableName + + ", with startTime = " + new Date(startTime) + + ", endTime = " + new Date(endTime)); + } + + @Override + protected Condition prepareMetricQueryCondition(long startTime, long endTime) { + EmptyCondition condition = new EmptyCondition(); + condition.setDoUpdate(true); + + condition.setStatement(String.format(PhoenixTransactSQL.GET_AGGREGATED_HOST_METRIC_GROUPBY_SQL, + outputTableName, endTime, tableName, + getDownsampledMetricSkipClause(), startTime, endTime)); + + if (LOG.isDebugEnabled()) { + LOG.debug("Condition: " + condition.toString()); + } + + return condition; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ambari/blob/42112e28/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/availability/AggregationTaskRunner.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/availability/AggregationTaskRunner.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/availability/AggregationTaskRunner.java new file mode 100644 index 0000000..9a27d55 --- /dev/null +++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/availability/AggregationTaskRunner.java @@ -0,0 +1,141 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ambari.metrics.core.timeline.availability; + +import static org.apache.ambari.metrics.core.timeline.availability.AggregationTaskRunner.AGGREGATOR_NAME.METRIC_AGGREGATE_DAILY; +import static org.apache.ambari.metrics.core.timeline.availability.AggregationTaskRunner.AGGREGATOR_NAME.METRIC_AGGREGATE_HOURLY; +import static org.apache.ambari.metrics.core.timeline.availability.AggregationTaskRunner.AGGREGATOR_NAME.METRIC_AGGREGATE_MINUTE; +import static org.apache.ambari.metrics.core.timeline.availability.AggregationTaskRunner.AGGREGATOR_NAME.METRIC_AGGREGATE_SECOND; +import static org.apache.ambari.metrics.core.timeline.availability.AggregationTaskRunner.AGGREGATOR_NAME.METRIC_RECORD_DAILY; +import static org.apache.ambari.metrics.core.timeline.availability.AggregationTaskRunner.AGGREGATOR_NAME.METRIC_RECORD_HOURLY; +import static org.apache.ambari.metrics.core.timeline.availability.AggregationTaskRunner.AGGREGATOR_NAME.METRIC_RECORD_MINUTE; + +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.atomic.AtomicBoolean; + +import org.apache.ambari.metrics.core.timeline.aggregators.TimelineMetricAggregator; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.helix.HelixManager; +import org.apache.helix.HelixManagerFactory; +import org.apache.helix.InstanceType; +import org.apache.helix.participant.StateMachineEngine; + +public class AggregationTaskRunner { + private final String instanceName; + private final String zkAddress; + private final String clusterName; + private HelixManager manager; + private static final Log LOG = LogFactory.getLog(AggregationTaskRunner.class); + private CheckpointManager checkpointManager; + // Map partition name to an aggregator dimension + static final Map<String, TimelineMetricAggregator.AGGREGATOR_TYPE> PARTITION_AGGREGATION_TYPES = new HashMap<>(); + // Ownership flags to be set by the State transitions + private final AtomicBoolean performsClusterAggregation = new AtomicBoolean(false); + private final AtomicBoolean performsHostAggregation = new AtomicBoolean(false); + + public enum AGGREGATOR_NAME { + METRIC_RECORD_MINUTE, + METRIC_RECORD_HOURLY, + METRIC_RECORD_DAILY, + METRIC_AGGREGATE_SECOND, + METRIC_AGGREGATE_MINUTE, + METRIC_AGGREGATE_HOURLY, + METRIC_AGGREGATE_DAILY, + } + + public static final Map<AGGREGATOR_NAME, String> ACTUAL_AGGREGATOR_NAMES = new HashMap<>(); + + static { + ACTUAL_AGGREGATOR_NAMES.put(METRIC_RECORD_MINUTE, "TimelineMetricHostAggregatorMinute"); + ACTUAL_AGGREGATOR_NAMES.put(METRIC_RECORD_HOURLY, "TimelineMetricHostAggregatorHourly"); + ACTUAL_AGGREGATOR_NAMES.put(METRIC_RECORD_DAILY, "TimelineMetricHostAggregatorDaily"); + ACTUAL_AGGREGATOR_NAMES.put(METRIC_AGGREGATE_SECOND, "TimelineClusterAggregatorSecond"); + ACTUAL_AGGREGATOR_NAMES.put(METRIC_AGGREGATE_MINUTE, "TimelineClusterAggregatorMinute"); + ACTUAL_AGGREGATOR_NAMES.put(METRIC_AGGREGATE_HOURLY, "TimelineClusterAggregatorHourly"); + ACTUAL_AGGREGATOR_NAMES.put(METRIC_AGGREGATE_DAILY, "TimelineClusterAggregatorDaily"); + + // Partition name to task assignment + PARTITION_AGGREGATION_TYPES.put(MetricCollectorHAController.METRIC_AGGREGATORS + "_0", TimelineMetricAggregator.AGGREGATOR_TYPE.CLUSTER); + PARTITION_AGGREGATION_TYPES.put(MetricCollectorHAController.METRIC_AGGREGATORS + "_1", TimelineMetricAggregator.AGGREGATOR_TYPE.HOST); + } + + public AggregationTaskRunner(String instanceName, String zkAddress, String clusterName) { + this.instanceName = instanceName; + this.zkAddress = zkAddress; + this.clusterName = clusterName; + } + + public void initialize() throws Exception { + manager = HelixManagerFactory.getZKHelixManager(clusterName, instanceName, + InstanceType.PARTICIPANT, zkAddress); + + OnlineOfflineStateModelFactory stateModelFactory = + new OnlineOfflineStateModelFactory(instanceName, this); + + StateMachineEngine stateMach = manager.getStateMachineEngine(); + stateMach.registerStateModelFactory(MetricCollectorHAController.DEFAULT_STATE_MODEL, stateModelFactory); + manager.connect(); + + checkpointManager = new CheckpointManager(manager.getHelixPropertyStore()); + } + + public boolean performsClusterAggregation() { + return performsClusterAggregation.get(); + } + + public boolean performsHostAggregation() { + return performsHostAggregation.get(); + } + + public CheckpointManager getCheckpointManager() { + return checkpointManager; + } + + public void setPartitionAggregationFunction(TimelineMetricAggregator.AGGREGATOR_TYPE type) { + switch (type) { + case HOST: + performsHostAggregation.set(true); + LOG.info("Set host aggregator function for : " + instanceName); + break; + case CLUSTER: + performsClusterAggregation.set(true); + LOG.info("Set cluster aggregator function for : " + instanceName); + } + } + + public void unsetPartitionAggregationFunction(TimelineMetricAggregator.AGGREGATOR_TYPE type) { + switch (type) { + case HOST: + performsHostAggregation.set(false); + LOG.info("Unset host aggregator function for : " + instanceName); + break; + case CLUSTER: + performsClusterAggregation.set(false); + LOG.info("Unset cluster aggregator function for : " + instanceName); + } + } + + /** + * Disconnect participant before controller shutdown + */ + void stop() { + manager.disconnect(); + } +} http://git-wip-us.apache.org/repos/asf/ambari/blob/42112e28/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/availability/CheckpointManager.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/availability/CheckpointManager.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/availability/CheckpointManager.java new file mode 100644 index 0000000..868fb93 --- /dev/null +++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/availability/CheckpointManager.java @@ -0,0 +1,95 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ambari.metrics.core.timeline.availability; + +import org.I0Itec.zkclient.DataUpdater; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.helix.AccessOption; +import org.apache.helix.ZNRecord; +import org.apache.helix.store.zk.ZkHelixPropertyStore; +import org.apache.zookeeper.data.Stat; + +public class CheckpointManager { + private final ZkHelixPropertyStore<ZNRecord> propertyStore; + private static final Log LOG = LogFactory.getLog(CheckpointManager.class); + + static final String ZNODE_FIELD = "checkpoint"; + static final String CHECKPOINT_PATH_PREFIX = "CHECKPOINTS"; + + public CheckpointManager(ZkHelixPropertyStore<ZNRecord> propertyStore) { + this.propertyStore = propertyStore; + } + + /** + * Read aggregator checkpoint from zookeeper + * + * @return timestamp + */ + public long readCheckpoint(AggregationTaskRunner.AGGREGATOR_NAME aggregatorName) { + String path = getCheckpointZKPath(aggregatorName); + LOG.debug("Reading checkpoint at " + path); + Stat stat = new Stat(); + ZNRecord znRecord = propertyStore.get(path, stat, AccessOption.PERSISTENT); + if (LOG.isTraceEnabled()) { + LOG.trace("Stat => " + stat); + } + long checkpoint = znRecord != null ? znRecord.getLongField(ZNODE_FIELD, -1) : -1; + LOG.debug("Checkpoint value = " + checkpoint); + return checkpoint; + } + + /** + * Write aggregator checkpoint in zookeeper + * + * @param value timestamp + * @return sucsess + */ + public boolean writeCheckpoint(AggregationTaskRunner.AGGREGATOR_NAME aggregatorName, long value) { + String path = getCheckpointZKPath(aggregatorName); + LOG.debug(String.format("Saving checkpoint at %s with value %s", path, value)); + return propertyStore.update(path, new CheckpointDataUpdater(path, value), AccessOption.PERSISTENT); + } + + static class CheckpointDataUpdater implements DataUpdater<ZNRecord> { + final String path; + final Long value; + + public CheckpointDataUpdater(String path, Long value) { + this.path = path; + this.value = value; + } + + @Override + public ZNRecord update(ZNRecord currentData) { + if (currentData == null) { + currentData = new ZNRecord(path); + } + currentData.setLongField(ZNODE_FIELD, value); + return currentData; + } + } + + String getCheckpointZKPath(AggregationTaskRunner.AGGREGATOR_NAME aggregatorName) { + StringBuilder sb = new StringBuilder("/"); + sb.append(CHECKPOINT_PATH_PREFIX); + sb.append("/"); + sb.append(AggregationTaskRunner.ACTUAL_AGGREGATOR_NAMES.get(aggregatorName)); + return sb.toString(); + } +} http://git-wip-us.apache.org/repos/asf/ambari/blob/42112e28/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/availability/MetricCollectorHAController.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/availability/MetricCollectorHAController.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/availability/MetricCollectorHAController.java new file mode 100644 index 0000000..ee28d87 --- /dev/null +++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/availability/MetricCollectorHAController.java @@ -0,0 +1,307 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ambari.metrics.core.timeline.availability; + +import static org.apache.helix.model.IdealState.RebalanceMode.FULL_AUTO; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.TreeSet; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; + +import org.I0Itec.zkclient.exception.ZkNoNodeException; +import org.apache.commons.collections.CollectionUtils; +import org.apache.commons.lang.StringUtils; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.ambari.metrics.core.timeline.MetricsSystemInitializationException; +import org.apache.ambari.metrics.core.timeline.TimelineMetricConfiguration; +import org.apache.helix.HelixAdmin; +import org.apache.helix.HelixException; +import org.apache.helix.HelixManager; +import org.apache.helix.HelixManagerFactory; +import org.apache.helix.InstanceType; +import org.apache.helix.NotificationContext; +import org.apache.helix.controller.GenericHelixController; +import org.apache.helix.manager.zk.ZKHelixAdmin; +import org.apache.helix.model.ExternalView; +import org.apache.helix.model.InstanceConfig; +import org.apache.helix.model.LiveInstance; +import org.apache.helix.model.OnlineOfflineSMD; +import org.apache.helix.model.StateModelDefinition; +import org.apache.helix.tools.StateModelConfigGenerator; + +import com.google.common.base.Joiner; + +; + +public class MetricCollectorHAController { + private static final Log LOG = LogFactory.getLog(MetricCollectorHAController.class); + + static final String CLUSTER_NAME = "ambari-metrics-cluster"; + static final String METRIC_AGGREGATORS = "METRIC_AGGREGATORS"; + static final String DEFAULT_STATE_MODEL = OnlineOfflineSMD.name; + static final String INSTANCE_NAME_DELIMITER = "_"; + + final String zkConnectUrl; + final String instanceHostname; + final InstanceConfig instanceConfig; + final AggregationTaskRunner aggregationTaskRunner; + final TimelineMetricConfiguration configuration; + + // Cache list of known live instances + final List<String> liveInstanceNames = new ArrayList<>(); + + // Helix Admin + HelixAdmin admin; + // Helix Manager + HelixManager manager; + + private volatile boolean isInitialized = false; + + public MetricCollectorHAController(TimelineMetricConfiguration configuration) { + this.configuration = configuration; + String instancePort; + try { + instanceHostname = configuration.getInstanceHostnameFromEnv(); + instancePort = configuration.getInstancePort(); + + } catch (Exception e) { + LOG.error("Error reading configs from classpath, will resort to defaults.", e); + throw new MetricsSystemInitializationException(e.getMessage()); + } + + try { + String zkClientPort = configuration.getClusterZKClientPort(); + String zkQuorum = configuration.getClusterZKQuorum(); + + if (StringUtils.isEmpty(zkClientPort) || StringUtils.isEmpty(zkQuorum)) { + throw new Exception("Unable to parse zookeeper quorum. clientPort = " + + zkClientPort +", quorum = " + zkQuorum); + } + + zkConnectUrl = configuration.getZkConnectionUrl(zkClientPort, zkQuorum); + + } catch (Exception e) { + LOG.error("Unable to load hbase-site from classpath.", e); + throw new MetricsSystemInitializationException(e.getMessage()); + } + + instanceConfig = new InstanceConfig(instanceHostname + INSTANCE_NAME_DELIMITER + instancePort); + instanceConfig.setHostName(instanceHostname); + instanceConfig.setPort(instancePort); + instanceConfig.setInstanceEnabled(true); + aggregationTaskRunner = new AggregationTaskRunner( + instanceConfig.getInstanceName(), zkConnectUrl, getClusterName()); + } + + /** + * Name of Helix znode + */ + public String getClusterName() { + return CLUSTER_NAME; + } + + /** + * Initialize the instance with zookeeper via Helix + */ + public void initializeHAController() throws Exception { + String clusterName = getClusterName(); + admin = new ZKHelixAdmin(zkConnectUrl); + // create cluster + LOG.info("Creating zookeeper cluster node: " + clusterName); + boolean clusterAdded = admin.addCluster(clusterName, false); + LOG.info("Was cluster added successfully? " + clusterAdded); + + // Adding host to the cluster + boolean success = false; + int tries = 5; + int sleepTimeInSeconds = 5; + + for (int i = 0; i < tries && !success; i++) { + try { + List<String> nodes = admin.getInstancesInCluster(clusterName); + if (CollectionUtils.isEmpty(nodes) || !nodes.contains(instanceConfig.getInstanceName())) { + LOG.info("Adding participant instance " + instanceConfig); + admin.addInstance(clusterName, instanceConfig); + } + success = true; + } catch (HelixException | ZkNoNodeException ex) { + LOG.warn("Helix Cluster not yet setup fully."); + if (i < tries - 1) { + LOG.info("Waiting for " + sleepTimeInSeconds + " seconds and retrying."); + TimeUnit.SECONDS.sleep(sleepTimeInSeconds); + } else { + LOG.error(ex); + } + } + } + + if (!success) { + LOG.info("Trying to create " + clusterName + " again since waiting for the creation did not help."); + admin.addCluster(clusterName, true); + List<String> nodes = admin.getInstancesInCluster(clusterName); + if (CollectionUtils.isEmpty(nodes) || !nodes.contains(instanceConfig.getInstanceName())) { + LOG.info("Adding participant instance " + instanceConfig); + admin.addInstance(clusterName, instanceConfig); + } + } + + // Add a state model + if (admin.getStateModelDef(clusterName, DEFAULT_STATE_MODEL) == null) { + LOG.info("Adding ONLINE-OFFLINE state model to the cluster"); + admin.addStateModelDef(clusterName, DEFAULT_STATE_MODEL, new StateModelDefinition( + StateModelConfigGenerator.generateConfigForOnlineOffline())); + } + + // Add resources with 1 cluster-wide replica + // Since our aggregators are unbalanced in terms of work distribution we + // only need to distribute writes to METRIC_AGGREGATE and + // METRIC_RECORD_MINUTE + List<String> resources = admin.getResourcesInCluster(clusterName); + if (!resources.contains(METRIC_AGGREGATORS)) { + LOG.info("Adding resource " + METRIC_AGGREGATORS + " with 2 partitions and 1 replicas"); + admin.addResource(clusterName, METRIC_AGGREGATORS, 2, DEFAULT_STATE_MODEL, FULL_AUTO.toString()); + } + // this will set up the ideal state, it calculates the preference list for + // each partition similar to consistent hashing + admin.rebalance(clusterName, METRIC_AGGREGATORS, 1); + + // Start participant + startAggregators(); + + // Start controller + startController(); + + Runtime.getRuntime().addShutdownHook(new Thread() { + @Override + public void run() { + aggregationTaskRunner.stop(); + manager.disconnect(); + } + }); + + isInitialized = true; + } + + /** + * Return true if HA controller is enabled. + */ + public boolean isInitialized() { + return isInitialized; + } + + private void startAggregators() { + try { + aggregationTaskRunner.initialize(); + + } catch (Exception e) { + LOG.error("Unable to start aggregators.", e); + throw new MetricsSystemInitializationException(e.getMessage()); + } + } + + private void startController() throws Exception { + manager = HelixManagerFactory.getZKHelixManager( + getClusterName(), + instanceHostname, + InstanceType.CONTROLLER, + zkConnectUrl + ); + + manager.connect(); + HelixController controller = new HelixController(); + manager.addLiveInstanceChangeListener(controller); + } + + public AggregationTaskRunner getAggregationTaskRunner() { + return aggregationTaskRunner; + } + + public List<String> getLiveInstanceHostNames() { + List<String> liveInstanceHostNames = new ArrayList<>(); + + for (String instance : liveInstanceNames) { + liveInstanceHostNames.add(instance.split(INSTANCE_NAME_DELIMITER)[0]); + } + + return liveInstanceHostNames; + } + + public class HelixController extends GenericHelixController { + ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor(); + Joiner joiner = Joiner.on(", ").skipNulls(); + + @Override + public void onLiveInstanceChange(List<LiveInstance> liveInstances, NotificationContext changeContext) { + super.onLiveInstanceChange(liveInstances, changeContext); + + liveInstanceNames.clear(); + for (LiveInstance instance : liveInstances) { + liveInstanceNames.add(instance.getInstanceName()); + } + + LOG.info("Detected change in liveliness of Collector instances. " + + "LiveIsntances = " + joiner.join(liveInstanceNames)); + // Print HA state - after some delay + executorService.schedule(new Runnable() { + @Override + public void run() { + printClusterState(); + } + }, 30, TimeUnit.SECONDS); + + + } + } + + public void printClusterState() { + StringBuilder sb = new StringBuilder("\n######################### Cluster HA state ########################"); + + ExternalView resourceExternalView = admin.getResourceExternalView(getClusterName(), METRIC_AGGREGATORS); + if (resourceExternalView != null) { + getPrintableResourceState(resourceExternalView, METRIC_AGGREGATORS, sb); + } + sb.append("\n##################################################"); + LOG.info(sb.toString()); + } + + private void getPrintableResourceState(ExternalView resourceExternalView, + String resourceName, + StringBuilder sb) { + TreeSet<String> sortedSet = new TreeSet<>(resourceExternalView.getPartitionSet()); + sb.append("\nCLUSTER: "); + sb.append(getClusterName()); + sb.append("\nRESOURCE: "); + sb.append(resourceName); + for (String partitionName : sortedSet) { + sb.append("\nPARTITION: "); + sb.append(partitionName).append("\t"); + Map<String, String> states = resourceExternalView.getStateMap(partitionName); + for (Map.Entry<String, String> stateEntry : states.entrySet()) { + sb.append("\t"); + sb.append(stateEntry.getKey()); + sb.append("\t"); + sb.append(stateEntry.getValue()); + } + } + } +} http://git-wip-us.apache.org/repos/asf/ambari/blob/42112e28/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/availability/OnlineOfflineStateModelFactory.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/availability/OnlineOfflineStateModelFactory.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/availability/OnlineOfflineStateModelFactory.java new file mode 100644 index 0000000..78a3199 --- /dev/null +++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/availability/OnlineOfflineStateModelFactory.java @@ -0,0 +1,69 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ambari.metrics.core.timeline.availability; + +import static org.apache.ambari.metrics.core.timeline.availability.AggregationTaskRunner.PARTITION_AGGREGATION_TYPES; + +import org.apache.ambari.metrics.core.timeline.aggregators.TimelineMetricAggregator; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.helix.NotificationContext; +import org.apache.helix.model.Message; +import org.apache.helix.participant.statemachine.StateModel; +import org.apache.helix.participant.statemachine.StateModelFactory; + +public class OnlineOfflineStateModelFactory extends StateModelFactory<StateModel> { + private static final Log LOG = LogFactory.getLog(OnlineOfflineStateModelFactory.class); + private final String instanceName; + private final AggregationTaskRunner taskRunner; + + public OnlineOfflineStateModelFactory(String instanceName, AggregationTaskRunner taskRunner) { + this.instanceName = instanceName; + this.taskRunner = taskRunner; + } + + @Override + public StateModel createNewStateModel(String resourceName, String partition) { + LOG.info("Received request to process partition = " + partition + ", for " + + "resource = " + resourceName + ", at " + instanceName); + return new OnlineOfflineStateModel(); + } + + public class OnlineOfflineStateModel extends StateModel { + public void onBecomeOnlineFromOffline(Message message, NotificationContext context) { + String partitionName = message.getPartitionName(); + LOG.info("Received transition to Online from Offline for partition: " + partitionName); + TimelineMetricAggregator.AGGREGATOR_TYPE type = PARTITION_AGGREGATION_TYPES.get(partitionName); + taskRunner.setPartitionAggregationFunction(type); + } + + public void onBecomeOfflineFromOnline(Message message, NotificationContext context) { + String partitionName = message.getPartitionName(); + LOG.info("Received transition to Offline from Online for partition: " + partitionName); + TimelineMetricAggregator.AGGREGATOR_TYPE type = PARTITION_AGGREGATION_TYPES.get(partitionName); + taskRunner.unsetPartitionAggregationFunction(type); + } + + public void onBecomeDroppedFromOffline(Message message, NotificationContext context) { + String partitionName = message.getPartitionName(); + LOG.info("Received transition to Dropped from Offline for partition: " + partitionName); + TimelineMetricAggregator.AGGREGATOR_TYPE type = PARTITION_AGGREGATION_TYPES.get(partitionName); + taskRunner.unsetPartitionAggregationFunction(type); + } + } +} http://git-wip-us.apache.org/repos/asf/ambari/blob/42112e28/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/discovery/TimelineMetricHostMetadata.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/discovery/TimelineMetricHostMetadata.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/discovery/TimelineMetricHostMetadata.java new file mode 100644 index 0000000..7645bd0 --- /dev/null +++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/discovery/TimelineMetricHostMetadata.java @@ -0,0 +1,60 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ambari.metrics.core.timeline.discovery; + +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; + +public class TimelineMetricHostMetadata { + //need concurrent data structure, only keys are used. + private ConcurrentHashMap<String, String> hostedApps = new ConcurrentHashMap<>(); + private byte[] uuid; + + // Default constructor + public TimelineMetricHostMetadata() { + } + + public TimelineMetricHostMetadata(ConcurrentHashMap<String, String> hostedApps) { + this.hostedApps = hostedApps; + } + + public TimelineMetricHostMetadata(Set<String> hostedApps) { + ConcurrentHashMap<String, String> appIdsMap = new ConcurrentHashMap<>(); + for (String appId : hostedApps) { + appIdsMap.put(appId, appId); + } + this.hostedApps = appIdsMap; + } + + public ConcurrentHashMap<String, String> getHostedApps() { + return hostedApps; + } + + public void setHostedApps(ConcurrentHashMap<String, String> hostedApps) { + this.hostedApps = hostedApps; + } + + public byte[] getUuid() { + return uuid; + } + + public void setUuid(byte[] uuid) { + this.uuid = uuid; + } +} http://git-wip-us.apache.org/repos/asf/ambari/blob/42112e28/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/discovery/TimelineMetricMetadataKey.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/discovery/TimelineMetricMetadataKey.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/discovery/TimelineMetricMetadataKey.java new file mode 100644 index 0000000..d308ce1 --- /dev/null +++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/discovery/TimelineMetricMetadataKey.java @@ -0,0 +1,80 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ambari.metrics.core.timeline.discovery; + +import javax.xml.bind.annotation.XmlRootElement; + +import org.apache.commons.lang3.StringUtils; + +@XmlRootElement +public class TimelineMetricMetadataKey { + String metricName; + String appId; + String instanceId; + + public TimelineMetricMetadataKey(String metricName, String appId, String instanceId) { + this.metricName = metricName; + this.appId = appId; + this.instanceId = instanceId; + } + + public String getMetricName() { + return metricName; + } + + public String getAppId() { + return appId; + } + + public String getInstanceId() { + return instanceId; + } + + public void setAppId(String appId) { + this.appId = appId; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + + TimelineMetricMetadataKey that = (TimelineMetricMetadataKey) o; + + if (!metricName.equals(that.metricName)) return false; + if (!appId.equals(that.appId)) return false; + return (StringUtils.isNotEmpty(instanceId) ? instanceId.equals(that.instanceId) : StringUtils.isEmpty(that.instanceId)); + } + + @Override + public int hashCode() { + int result = metricName.hashCode(); + result = 31 * result + (appId != null ? appId.hashCode() : 0); + result = 31 * result + (instanceId != null ? instanceId.hashCode() : 0); + return result; + } + + @Override + public String toString() { + return "TimelineMetricMetadataKey{" + + "metricName='" + metricName + '\'' + + ", appId='" + appId + '\'' + + ", instanceId='" + instanceId + '\'' + + '}'; + } +}
