http://git-wip-us.apache.org/repos/asf/ambari/blob/42112e28/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/HBaseTimelineMetricsService.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/HBaseTimelineMetricsService.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/HBaseTimelineMetricsService.java new file mode 100644 index 0000000..b357e0b --- /dev/null +++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/HBaseTimelineMetricsService.java @@ -0,0 +1,592 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ambari.metrics.core.timeline; + +import static org.apache.ambari.metrics.core.timeline.availability.AggregationTaskRunner.ACTUAL_AGGREGATOR_NAMES; + +import java.io.IOException; +import java.net.MalformedURLException; +import java.net.URISyntaxException; +import java.net.UnknownHostException; +import java.sql.SQLException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.TimeUnit; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +import org.apache.ambari.metrics.core.timeline.aggregators.Function; +import org.apache.ambari.metrics.core.timeline.aggregators.TimelineMetricAggregator; +import org.apache.ambari.metrics.core.timeline.aggregators.TimelineMetricAggregatorFactory; +import org.apache.ambari.metrics.core.timeline.availability.MetricCollectorHAController; +import org.apache.ambari.metrics.core.timeline.discovery.TimelineMetricHostMetadata; +import org.apache.ambari.metrics.core.timeline.discovery.TimelineMetricMetadataKey; +import org.apache.ambari.metrics.core.timeline.discovery.TimelineMetricMetadataManager; +import org.apache.ambari.metrics.core.timeline.function.SeriesAggregateFunction; +import org.apache.ambari.metrics.core.timeline.function.TimelineMetricsSeriesAggregateFunction; +import org.apache.ambari.metrics.core.timeline.function.TimelineMetricsSeriesAggregateFunctionFactory; +import org.apache.ambari.metrics.core.timeline.query.Condition; +import org.apache.ambari.metrics.core.timeline.query.ConditionBuilder; +import org.apache.ambari.metrics.core.timeline.query.PhoenixTransactSQL; +import org.apache.ambari.metrics.core.timeline.query.TopNCondition; +import org.apache.commons.collections.MapUtils; +import org.apache.commons.lang.StringUtils; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.metrics2.sink.timeline.AggregationResult; +import org.apache.hadoop.metrics2.sink.timeline.ContainerMetric; +import org.apache.hadoop.metrics2.sink.timeline.MetricHostAggregate; +import org.apache.hadoop.metrics2.sink.timeline.Precision; +import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric; +import org.apache.hadoop.metrics2.sink.timeline.TimelineMetricMetadata; +import org.apache.hadoop.metrics2.sink.timeline.TimelineMetricWithAggregatedValues; +import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics; +import org.apache.hadoop.metrics2.sink.timeline.TopNConfig; +import org.apache.hadoop.service.AbstractService; +import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse; +import org.apache.ambari.metrics.core.timeline.availability.AggregationTaskRunner.AGGREGATOR_NAME; + +import com.google.common.collect.ArrayListMultimap; +import com.google.common.collect.Multimap; + +public class HBaseTimelineMetricsService extends AbstractService implements TimelineMetricStore { + + static final Log LOG = LogFactory.getLog(HBaseTimelineMetricsService.class); + private final TimelineMetricConfiguration configuration; + private TimelineMetricDistributedCache cache; + private PhoenixHBaseAccessor hBaseAccessor; + private static volatile boolean isInitialized = false; + private final ScheduledExecutorService watchdogExecutorService = Executors.newSingleThreadScheduledExecutor(); + private final Map<AGGREGATOR_NAME, ScheduledExecutorService> scheduledExecutors = new HashMap<>(); + private final ConcurrentHashMap<String, Long> postedAggregatedMap = new ConcurrentHashMap<>(); + private TimelineMetricMetadataManager metricMetadataManager; + private Integer defaultTopNHostsLimit; + private MetricCollectorHAController haController; + private boolean containerMetricsDisabled = false; + + /** + * Construct the service. + * + */ + public HBaseTimelineMetricsService(TimelineMetricConfiguration configuration) { + super(HBaseTimelineMetricsService.class.getName()); + this.configuration = configuration; + } + + @Override + protected void serviceInit(Configuration conf) throws Exception { + super.serviceInit(conf); + initializeSubsystem(); + } + + private TimelineMetricDistributedCache startCacheNode() throws MalformedURLException, URISyntaxException { + //TODO make configurable + return new TimelineMetricsIgniteCache(); + } + + + private synchronized void initializeSubsystem() { + if (!isInitialized) { + hBaseAccessor = new PhoenixHBaseAccessor(null); + // Initialize schema + hBaseAccessor.initMetricSchema(); + // Initialize metadata from store + try { + metricMetadataManager = new TimelineMetricMetadataManager(hBaseAccessor); + } catch (MalformedURLException | URISyntaxException e) { + throw new ExceptionInInitializerError("Unable to initialize metadata manager"); + } + metricMetadataManager.initializeMetadata(); + // Initialize policies before TTL update + hBaseAccessor.initPoliciesAndTTL(); + // Start HA service + // Start the controller + if (!configuration.isDistributedCollectorModeDisabled()) { + haController = new MetricCollectorHAController(configuration); + try { + haController.initializeHAController(); + } catch (Exception e) { + LOG.error(e); + throw new MetricsSystemInitializationException("Unable to " + + "initialize HA controller", e); + } + } else { + LOG.info("Distributed collector mode disabled"); + } + + //Initialize whitelisting & blacklisting if needed + TimelineMetricsFilter.initializeMetricFilter(configuration); + + Configuration metricsConf = null; + try { + metricsConf = configuration.getMetricsConf(); + } catch (Exception e) { + throw new ExceptionInInitializerError("Cannot initialize configuration."); + } + + if (configuration.isCollectorInMemoryAggregationEnabled()) { + try { + cache = startCacheNode(); + } catch (Exception e) { + throw new MetricsSystemInitializationException("Unable to " + + "start cache node", e); + } + } + + defaultTopNHostsLimit = Integer.parseInt(metricsConf.get(TimelineMetricConfiguration.DEFAULT_TOPN_HOSTS_LIMIT, "20")); + if (Boolean.parseBoolean(metricsConf.get(TimelineMetricConfiguration.USE_GROUPBY_AGGREGATOR_QUERIES, "true"))) { + LOG.info("Using group by aggregators for aggregating host and cluster metrics."); + } + + // Start the cluster aggregator second + TimelineMetricAggregator secondClusterAggregator = + TimelineMetricAggregatorFactory.createTimelineClusterAggregatorSecond( + hBaseAccessor, metricsConf, metricMetadataManager, haController, cache); + scheduleAggregatorThread(secondClusterAggregator); + + // Start the minute cluster aggregator + TimelineMetricAggregator minuteClusterAggregator = + TimelineMetricAggregatorFactory.createTimelineClusterAggregatorMinute( + hBaseAccessor, metricsConf, metricMetadataManager, haController); + scheduleAggregatorThread(minuteClusterAggregator); + + // Start the hourly cluster aggregator + TimelineMetricAggregator hourlyClusterAggregator = + TimelineMetricAggregatorFactory.createTimelineClusterAggregatorHourly( + hBaseAccessor, metricsConf, metricMetadataManager, haController); + scheduleAggregatorThread(hourlyClusterAggregator); + + // Start the daily cluster aggregator + TimelineMetricAggregator dailyClusterAggregator = + TimelineMetricAggregatorFactory.createTimelineClusterAggregatorDaily( + hBaseAccessor, metricsConf, metricMetadataManager, haController); + scheduleAggregatorThread(dailyClusterAggregator); + + // Start the minute host aggregator + if (configuration.isHostInMemoryAggregationEnabled()) { + LOG.info("timeline.metrics.host.inmemory.aggregation is set to True, switching to filtering host minute aggregation on collector"); + TimelineMetricAggregator minuteHostAggregator = + TimelineMetricAggregatorFactory.createFilteringTimelineMetricAggregatorMinute( + hBaseAccessor, metricsConf, metricMetadataManager, haController, postedAggregatedMap); + scheduleAggregatorThread(minuteHostAggregator); + } else { + TimelineMetricAggregator minuteHostAggregator = + TimelineMetricAggregatorFactory.createTimelineMetricAggregatorMinute( + hBaseAccessor, metricsConf, metricMetadataManager, haController); + scheduleAggregatorThread(minuteHostAggregator); + } + + // Start the hourly host aggregator + TimelineMetricAggregator hourlyHostAggregator = + TimelineMetricAggregatorFactory.createTimelineMetricAggregatorHourly( + hBaseAccessor, metricsConf, metricMetadataManager, haController); + scheduleAggregatorThread(hourlyHostAggregator); + + // Start the daily host aggregator + TimelineMetricAggregator dailyHostAggregator = + TimelineMetricAggregatorFactory.createTimelineMetricAggregatorDaily( + hBaseAccessor, metricsConf, metricMetadataManager, haController); + scheduleAggregatorThread(dailyHostAggregator); + + if (!configuration.isTimelineMetricsServiceWatcherDisabled()) { + int initDelay = configuration.getTimelineMetricsServiceWatcherInitDelay(); + int delay = configuration.getTimelineMetricsServiceWatcherDelay(); + // Start the watchdog + watchdogExecutorService.scheduleWithFixedDelay( + new TimelineMetricStoreWatcher(this, configuration), + initDelay, delay, TimeUnit.SECONDS); + LOG.info("Started watchdog for timeline metrics store with initial " + + "delay = " + initDelay + ", delay = " + delay); + } + containerMetricsDisabled = configuration.isContainerMetricsDisabled(); + isInitialized = true; + } + + } + + @Override + protected void serviceStop() throws Exception { + super.serviceStop(); + } + + @Override + public TimelineMetrics getTimelineMetrics(List<String> metricNames, + List<String> hostnames, String applicationId, String instanceId, + Long startTime, Long endTime, Precision precision, Integer limit, + boolean groupedByHosts, TopNConfig topNConfig, String seriesAggregateFunction) throws SQLException, IOException { + + if (metricNames == null || metricNames.isEmpty()) { + throw new IllegalArgumentException("No metric name filter specified."); + } + if ((startTime == null && endTime != null) + || (startTime != null && endTime == null)) { + throw new IllegalArgumentException("Open ended query not supported "); + } + if (limit != null && limit > PhoenixHBaseAccessor.RESULTSET_LIMIT){ + throw new IllegalArgumentException("Limit too big"); + } + + TimelineMetricsSeriesAggregateFunction seriesAggrFunctionInstance = null; + if (!StringUtils.isEmpty(seriesAggregateFunction)) { + SeriesAggregateFunction func = SeriesAggregateFunction.getFunction(seriesAggregateFunction); + seriesAggrFunctionInstance = TimelineMetricsSeriesAggregateFunctionFactory.newInstance(func); + } + + Multimap<String, List<Function>> metricFunctions = + parseMetricNamesToAggregationFunctions(metricNames); + + List<byte[]> uuids = metricMetadataManager.getUuids(metricFunctions.keySet(), hostnames, applicationId, instanceId); + + ConditionBuilder conditionBuilder = new ConditionBuilder(new ArrayList<String>(metricFunctions.keySet())) + .hostnames(hostnames) + .appId(applicationId) + .instanceId(instanceId) + .startTime(startTime) + .endTime(endTime) + .precision(precision) + .limit(limit) + .grouped(groupedByHosts) + .uuid(uuids); + + if (topNConfig != null) { + if (TopNCondition.isTopNHostCondition(metricNames, hostnames) ^ //Only 1 condition should be true. + TopNCondition.isTopNMetricCondition(metricNames, hostnames)) { + conditionBuilder.topN(topNConfig.getTopN()); + conditionBuilder.isBottomN(topNConfig.getIsBottomN()); + Function.ReadFunction readFunction = Function.ReadFunction.getFunction(topNConfig.getTopNFunction()); + Function function = new Function(readFunction, null); + conditionBuilder.topNFunction(function); + } else { + LOG.info("Invalid Input for TopN query. Ignoring TopN Request."); + } + } else if (startTime != null && hostnames != null && hostnames.size() > defaultTopNHostsLimit) { + // if (timeseries query AND hostnames passed AND size(hostnames) > limit) + LOG.info("Requesting data for more than " + defaultTopNHostsLimit + " Hosts. " + + "Defaulting to Top " + defaultTopNHostsLimit); + conditionBuilder.topN(defaultTopNHostsLimit); + conditionBuilder.isBottomN(false); + } + + Condition condition = conditionBuilder.build(); + + TimelineMetrics metrics; + + if (hostnames == null || hostnames.isEmpty()) { + metrics = hBaseAccessor.getAggregateMetricRecords(condition, metricFunctions); + } else { + metrics = hBaseAccessor.getMetricRecords(condition, metricFunctions); + } + + metrics = postProcessMetrics(metrics); + + if (metrics.getMetrics().size() == 0) { + return metrics; + } + + return seriesAggregateMetrics(seriesAggrFunctionInstance, metrics); + } + + private TimelineMetrics postProcessMetrics(TimelineMetrics metrics) { + List<TimelineMetric> metricsList = metrics.getMetrics(); + + for (TimelineMetric metric : metricsList){ + String name = metric.getMetricName(); + if (name.contains("._rate")){ + updateValuesAsRate(metric.getMetricValues(), false); + } else if (name.contains("._diff")) { + updateValuesAsRate(metric.getMetricValues(), true); + } + } + + return metrics; + } + + private TimelineMetrics seriesAggregateMetrics(TimelineMetricsSeriesAggregateFunction seriesAggrFuncInstance, + TimelineMetrics metrics) { + if (seriesAggrFuncInstance != null) { + TimelineMetric appliedMetric = seriesAggrFuncInstance.apply(metrics); + metrics.setMetrics(Collections.singletonList(appliedMetric)); + } + return metrics; + } + + static Map<Long, Double> updateValuesAsRate(Map<Long, Double> metricValues, boolean isDiff) { + Long prevTime = null; + Double prevVal = null; + long step; + Double diff; + + for(Iterator<Map.Entry<Long, Double>> it = metricValues.entrySet().iterator(); it.hasNext(); ) { + Map.Entry<Long, Double> timeValueEntry = it.next(); + Long currTime = timeValueEntry.getKey(); + Double currVal = timeValueEntry.getValue(); + + if (prevTime != null) { + step = currTime - prevTime; + diff = currVal - prevVal; + if (diff < 0) { + it.remove(); //Discard calculating rate when the metric counter has been reset. + } else { + Double rate = isDiff ? diff : (diff / TimeUnit.MILLISECONDS.toSeconds(step)); + timeValueEntry.setValue(rate); + } + } else { + it.remove(); + } + + prevTime = currTime; + prevVal = currVal; + } + + return metricValues; + } + + static Multimap<String, List<Function>> parseMetricNamesToAggregationFunctions(List<String> metricNames) { + Multimap<String, List<Function>> metricsFunctions = ArrayListMultimap.create(); + + for (String metricName : metricNames){ + Function function = Function.DEFAULT_VALUE_FUNCTION; + String cleanMetricName = metricName; + + try { + function = Function.fromMetricName(metricName); + int functionStartIndex = metricName.indexOf("._"); + if (functionStartIndex > 0) { + cleanMetricName = metricName.substring(0, functionStartIndex); + } + } catch (Function.FunctionFormatException ffe){ + // unknown function so + // fallback to VALUE, and fullMetricName + } + + List<Function> functionsList = new ArrayList<>(); + functionsList.add(function); + metricsFunctions.put(cleanMetricName, functionsList); + } + + return metricsFunctions; + } + + @Override + public TimelinePutResponse putMetrics(TimelineMetrics metrics) throws SQLException, IOException { + // Error indicated by the Sql exception + TimelinePutResponse response = new TimelinePutResponse(); + + hBaseAccessor.insertMetricRecordsWithMetadata(metricMetadataManager, metrics, false); + + if (configuration.isCollectorInMemoryAggregationEnabled()) { + cache.putMetrics(metrics.getMetrics(), metricMetadataManager); + } + + return response; + } + + @Override + public TimelinePutResponse putContainerMetrics(List<ContainerMetric> metrics) + throws SQLException, IOException { + + if (containerMetricsDisabled) { + LOG.debug("Ignoring submitted container metrics according to configuration. Values will not be stored."); + return new TimelinePutResponse(); + } + + hBaseAccessor.insertContainerMetrics(metrics); + return new TimelinePutResponse(); + } + + @Override + public Map<String, List<TimelineMetricMetadata>> getTimelineMetricMetadata(String appId, String metricPattern, + boolean includeBlacklistedMetrics) throws SQLException, IOException { + Map<TimelineMetricMetadataKey, TimelineMetricMetadata> metadata = + metricMetadataManager.getMetadataCache(); + + boolean filterByAppId = StringUtils.isNotEmpty(appId); + boolean filterByMetricName = StringUtils.isNotEmpty(metricPattern); + Pattern metricFilterPattern = null; + if (filterByMetricName) { + metricFilterPattern = Pattern.compile(metricPattern); + } + + // Group Metadata by AppId + Map<String, List<TimelineMetricMetadata>> metadataByAppId = new HashMap<>(); + for (TimelineMetricMetadata metricMetadata : metadata.values()) { + + if (!includeBlacklistedMetrics && !metricMetadata.isWhitelisted()) { + continue; + } + + String currentAppId = metricMetadata.getAppId(); + if (filterByAppId && !currentAppId.equals(appId)) { + continue; + } + + if (filterByMetricName) { + Matcher m = metricFilterPattern.matcher(metricMetadata.getMetricName()); + if (!m.find()) { + continue; + } + } + + List<TimelineMetricMetadata> metadataList = metadataByAppId.get(currentAppId); + if (metadataList == null) { + metadataList = new ArrayList<>(); + metadataByAppId.put(currentAppId, metadataList); + } + + metadataList.add(metricMetadata); + } + + return metadataByAppId; + } + + @Override + public byte[] getUuid(String metricName, String appId, String instanceId, String hostname) throws SQLException, IOException { + return metricMetadataManager.getUuid(metricName, appId, instanceId, hostname); + } + + @Override + public Map<String, Set<String>> getHostAppsMetadata() throws SQLException, IOException { + Map<String, TimelineMetricHostMetadata> hostsMetadata = metricMetadataManager.getHostedAppsCache(); + Map<String, Set<String>> hostAppMap = new HashMap<>(); + for (String hostname : hostsMetadata.keySet()) { + hostAppMap.put(hostname, hostsMetadata.get(hostname).getHostedApps().keySet()); + } + return hostAppMap; + } + + @Override + public TimelinePutResponse putHostAggregatedMetrics(AggregationResult aggregationResult) throws SQLException, IOException { + Map<TimelineMetric, MetricHostAggregate> aggregateMap = new HashMap<>(); + String hostname = null; + for (TimelineMetricWithAggregatedValues entry : aggregationResult.getResult()) { + aggregateMap.put(entry.getTimelineMetric(), entry.getMetricAggregate()); + hostname = hostname == null ? entry.getTimelineMetric().getHostName() : hostname; + break; + } + long timestamp = aggregationResult.getTimeInMilis(); + postedAggregatedMap.put(hostname, timestamp); + if (LOG.isDebugEnabled()) { + LOG.debug(String.format("Adding host %s to aggregated by in-memory aggregator. Timestamp : %s", hostname, timestamp)); + } + hBaseAccessor.saveHostAggregateRecords(aggregateMap, PhoenixTransactSQL.METRICS_AGGREGATE_MINUTE_TABLE_NAME); + + + return new TimelinePutResponse(); + } + + @Override + public Map<String, Map<String,Set<String>>> getInstanceHostsMetadata(String instanceId, String appId) throws SQLException, IOException { + Map<String, Set<String>> hostedApps = getHostAppsMetadata(); + Map<String, Set<String>> instanceHosts = metricMetadataManager.getHostedInstanceCache(); + if (configuration.getTimelineMetricsMultipleClusterSupport()) { + instanceHosts = metricMetadataManager.getHostedInstanceCache(); + } + Map<String, Map<String, Set<String>>> instanceAppHosts = new HashMap<>(); + + if (MapUtils.isEmpty(instanceHosts)) { + Map<String, Set<String>> appHostMap = new HashMap<String, Set<String>>(); + for (String host : hostedApps.keySet()) { + for (String app : hostedApps.get(host)) { + if (!appHostMap.containsKey(app)) { + appHostMap.put(app, new HashSet<String>()); + } + appHostMap.get(app).add(host); + } + } + instanceAppHosts.put("", appHostMap); + } else { + for (String instance : instanceHosts.keySet()) { + + if (StringUtils.isNotEmpty(instanceId) && !instance.equals(instanceId)) { + continue; + } + Map<String, Set<String>> appHostMap = new HashMap<String, Set<String>>(); + instanceAppHosts.put(instance, appHostMap); + + Set<String> hostsWithInstance = instanceHosts.get(instance); + for (String host : hostsWithInstance) { + for (String app : hostedApps.get(host)) { + if (StringUtils.isNotEmpty(appId) && !app.equals(appId)) { + continue; + } + + if (!appHostMap.containsKey(app)) { + appHostMap.put(app, new HashSet<String>()); + } + appHostMap.get(app).add(host); + } + } + } + } + + return instanceAppHosts; + } + + @Override + public List<String> getLiveInstances() { + + List<String> instances = null; + try { + if (haController == null) { + // Always return current host as live (embedded operation mode) + return Collections.singletonList(configuration.getInstanceHostnameFromEnv()); + } + instances = haController.getLiveInstanceHostNames(); + if (instances == null || instances.isEmpty()) { + // fallback + instances = Collections.singletonList(configuration.getInstanceHostnameFromEnv()); + } + } catch (UnknownHostException e) { + LOG.debug("Exception on getting hostname from env.", e); + } + return instances; + } + + private void scheduleAggregatorThread(final TimelineMetricAggregator aggregator) { + if (!aggregator.isDisabled()) { + ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor( + new ThreadFactory() { + @Override + public Thread newThread(Runnable r) { + return new Thread(r, ACTUAL_AGGREGATOR_NAMES.get(aggregator.getName())); + } + } + ); + scheduledExecutors.put(aggregator.getName(), executorService); + executorService.scheduleAtFixedRate(aggregator, + 0l, + aggregator.getSleepIntervalMillis(), + TimeUnit.MILLISECONDS); + LOG.info("Scheduled aggregator thread " + aggregator.getName() + " every " + + + aggregator.getSleepIntervalMillis() + " milliseconds."); + } else { + LOG.info("Skipped scheduling " + aggregator.getName() + " since it is disabled."); + } + } +}
http://git-wip-us.apache.org/repos/asf/ambari/blob/42112e28/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/MetricsCacheCommitterThread.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/MetricsCacheCommitterThread.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/MetricsCacheCommitterThread.java new file mode 100644 index 0000000..d0bf137 --- /dev/null +++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/MetricsCacheCommitterThread.java @@ -0,0 +1,38 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ambari.metrics.core.timeline; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +public class MetricsCacheCommitterThread implements Runnable { + + private static final Log LOG = LogFactory.getLog(MetricsCacheCommitterThread.class); + private static PhoenixHBaseAccessor phoenixHBaseAccessor; + + public MetricsCacheCommitterThread(PhoenixHBaseAccessor phoenixHBaseAccessor) { + this.phoenixHBaseAccessor = phoenixHBaseAccessor; + } + @Override + public void run() { + LOG.debug("Checking if metrics cache is empty"); + if (!phoenixHBaseAccessor.isInsertCacheEmpty()) { + phoenixHBaseAccessor.commitMetricsFromCache(); + } + } +} http://git-wip-us.apache.org/repos/asf/ambari/blob/42112e28/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/MetricsSystemInitializationException.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/MetricsSystemInitializationException.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/MetricsSystemInitializationException.java new file mode 100644 index 0000000..f24f6e9 --- /dev/null +++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/MetricsSystemInitializationException.java @@ -0,0 +1,41 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ambari.metrics.core.timeline; + +/** + * RuntimeException for initialization of metrics schema. It is RuntimeException + * since this is a not recoverable situation, and should be handled by main or + * service method followed by shutdown. + */ +public class MetricsSystemInitializationException extends RuntimeException { + public MetricsSystemInitializationException() { + } + + public MetricsSystemInitializationException(String msg) { + super(msg); + } + + public MetricsSystemInitializationException(Throwable t) { + super(t); + } + + public MetricsSystemInitializationException(String msg, Throwable t) { + super(msg, t); + } + +}
