AMBARI-15623. Support distributed aggregation for multiple AMS instances. (swagle)
Project: http://git-wip-us.apache.org/repos/asf/ambari/repo Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/14a4f970 Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/14a4f970 Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/14a4f970 Branch: refs/heads/branch-2.5 Commit: 14a4f97082045969702e3725a318e2a38639a3c3 Parents: 191a628 Author: Aravindan Vijayan <[email protected]> Authored: Mon Nov 14 16:34:46 2016 -0800 Committer: Aravindan Vijayan <[email protected]> Committed: Tue Nov 15 11:02:12 2016 -0800 ---------------------------------------------------------------------- .../ambari-metrics-timelineservice/pom.xml | 23 ++ .../timeline/HBaseTimelineMetricStore.java | 87 ++++-- .../timeline/TimelineMetricConfiguration.java | 46 ++++ .../metrics/timeline/TimelineMetricStore.java | 6 + .../aggregators/AbstractTimelineAggregator.java | 107 +++++-- .../aggregators/TimelineMetricAggregator.java | 26 +- .../TimelineMetricAggregatorFactory.java | 100 ++++--- .../TimelineMetricClusterAggregator.java | 9 +- .../TimelineMetricClusterAggregatorSecond.java | 9 +- .../TimelineMetricHostAggregator.java | 11 +- .../v2/TimelineMetricClusterAggregator.java | 11 +- .../v2/TimelineMetricHostAggregator.java | 10 +- .../availability/AggregationTaskRunner.java | 144 ++++++++++ .../availability/CheckpointManager.java | 98 +++++++ .../OnlineOfflineStateModelFactory.java | 69 +++++ .../TimelineMetricHAController.java | 276 +++++++++++++++++++ .../query/DefaultPhoenixDataSource.java | 2 +- .../webapp/TimelineWebServices.java | 12 + .../timeline/ITPhoenixHBaseAccessor.java | 12 +- .../timeline/TestTimelineMetricStore.java | 5 + .../AbstractTimelineAggregatorTest.java | 6 +- .../aggregators/ITClusterAggregator.java | 20 +- .../aggregators/ITMetricAggregator.java | 19 +- ...melineMetricClusterAggregatorSecondTest.java | 14 +- .../TimelineMetricHAControllerTest.java | 107 +++++++ .../server/upgrade/UpgradeCatalog240.java | 2 +- .../0.1.0/configuration/ams-env.xml | 3 + .../AMBARI_METRICS/0.1.0/metainfo.xml | 2 +- .../server/upgrade/UpgradeCatalog240Test.java | 96 ++++--- 29 files changed, 1134 insertions(+), 198 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ambari/blob/14a4f970/ambari-metrics/ambari-metrics-timelineservice/pom.xml ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/pom.xml b/ambari-metrics/ambari-metrics-timelineservice/pom.xml index fdbfdcb..fa5d991 100644 --- a/ambari-metrics/ambari-metrics-timelineservice/pom.xml +++ b/ambari-metrics/ambari-metrics-timelineservice/pom.xml @@ -268,6 +268,25 @@ </build> <dependencies> + + <dependency> + <groupId>org.apache.helix</groupId> + <artifactId>helix-core</artifactId> + <version>0.6.5</version> + <exclusions> + <exclusion> + <artifactId>zookeeper</artifactId> + <groupId>org.apache.zookeeper</groupId> + </exclusion> + </exclusions> + </dependency> + + <dependency> + <artifactId>zookeeper</artifactId> + <groupId>org.apache.zookeeper</groupId> + <version>3.4.8</version> + </dependency> + <dependency> <groupId>org.apache.phoenix</groupId> <artifactId>phoenix-core</artifactId> @@ -589,6 +608,10 @@ <groupId>org.jruby</groupId> <artifactId>jruby-complete</artifactId> </exclusion> + <exclusion> + <artifactId>zookeeper</artifactId> + <groupId>org.apache.zookeeper</groupId> + </exclusion> </exclusions> </dependency> <dependency> http://git-wip-us.apache.org/repos/asf/ambari/blob/14a4f970/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/HBaseTimelineMetricStore.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/HBaseTimelineMetricStore.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/HBaseTimelineMetricStore.java index 686533b..933bdf0 100644 --- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/HBaseTimelineMetricStore.java +++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/HBaseTimelineMetricStore.java @@ -35,6 +35,8 @@ import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline. import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.Function; import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.TimelineMetricAggregator; import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.TimelineMetricAggregatorFactory; +import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.AggregationTaskRunner; +import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.TimelineMetricHAController; import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.discovery.TimelineMetricMetadataKey; import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.discovery.TimelineMetricMetadataManager; import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.function.SeriesAggregateFunction; @@ -43,7 +45,6 @@ import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline. import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.Condition; import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.ConditionBuilder; import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.TopNCondition; - import java.io.IOException; import java.sql.SQLException; import java.util.ArrayList; @@ -56,10 +57,13 @@ import java.util.Set; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; - +import java.util.concurrent.ThreadFactory; +import static java.util.concurrent.TimeUnit.SECONDS; +import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.AGGREGATOR_CHECKPOINT_DELAY; +import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.DEFAULT_TOPN_HOSTS_LIMIT; import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.TIMELINE_METRICS_WHITELIST_FILE; import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.USE_GROUPBY_AGGREGATOR_QUERIES; -import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.DEFAULT_TOPN_HOSTS_LIMIT; +import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.AggregationTaskRunner.ACTUAL_AGGREGATOR_NAMES; public class HBaseTimelineMetricStore extends AbstractService implements TimelineMetricStore { @@ -67,9 +71,11 @@ public class HBaseTimelineMetricStore extends AbstractService implements Timelin private final TimelineMetricConfiguration configuration; private PhoenixHBaseAccessor hBaseAccessor; private static volatile boolean isInitialized = false; - private final ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor(); + private final ScheduledExecutorService watchdogExecutorService = Executors.newSingleThreadScheduledExecutor(); + private final Map<AggregationTaskRunner.AGGREGATOR_NAME, ScheduledExecutorService> scheduledExecutors = new HashMap<>(); private TimelineMetricMetadataManager metricMetadataManager; private Integer defaultTopNHostsLimit; + private TimelineMetricHAController haController; /** * Construct the service. @@ -97,6 +103,18 @@ public class HBaseTimelineMetricStore extends AbstractService implements Timelin metricMetadataManager.initializeMetadata(); // Initialize policies before TTL update hBaseAccessor.initPoliciesAndTTL(); + // Start HA service + if (configuration.isDistributedOperationModeEnabled()) { + // Start the controller + haController = new TimelineMetricHAController(configuration); + try { + haController.initializeHAController(); + } catch (Exception e) { + LOG.error(e); + throw new MetricsSystemInitializationException("Unable to " + + "initialize HA controller", e); + } + } String whitelistFile = metricsConf.get(TIMELINE_METRICS_WHITELIST_FILE, ""); if (!StringUtils.isEmpty(whitelistFile)) { @@ -110,44 +128,51 @@ public class HBaseTimelineMetricStore extends AbstractService implements Timelin // Start the cluster aggregator second TimelineMetricAggregator secondClusterAggregator = - TimelineMetricAggregatorFactory.createTimelineClusterAggregatorSecond(hBaseAccessor, metricsConf, metricMetadataManager); - scheduleAggregatorThread(secondClusterAggregator); + TimelineMetricAggregatorFactory.createTimelineClusterAggregatorSecond( + hBaseAccessor, metricsConf, metricMetadataManager, haController); + scheduleAggregatorThread(secondClusterAggregator, metricsConf); // Start the minute cluster aggregator TimelineMetricAggregator minuteClusterAggregator = - TimelineMetricAggregatorFactory.createTimelineClusterAggregatorMinute(hBaseAccessor, metricsConf); - scheduleAggregatorThread(minuteClusterAggregator); + TimelineMetricAggregatorFactory.createTimelineClusterAggregatorMinute( + hBaseAccessor, metricsConf, haController); + scheduleAggregatorThread(minuteClusterAggregator, metricsConf); // Start the hourly cluster aggregator TimelineMetricAggregator hourlyClusterAggregator = - TimelineMetricAggregatorFactory.createTimelineClusterAggregatorHourly(hBaseAccessor, metricsConf); - scheduleAggregatorThread(hourlyClusterAggregator); + TimelineMetricAggregatorFactory.createTimelineClusterAggregatorHourly( + hBaseAccessor, metricsConf, haController); + scheduleAggregatorThread(hourlyClusterAggregator, metricsConf); // Start the daily cluster aggregator TimelineMetricAggregator dailyClusterAggregator = - TimelineMetricAggregatorFactory.createTimelineClusterAggregatorDaily(hBaseAccessor, metricsConf); - scheduleAggregatorThread(dailyClusterAggregator); + TimelineMetricAggregatorFactory.createTimelineClusterAggregatorDaily( + hBaseAccessor, metricsConf, haController); + scheduleAggregatorThread(dailyClusterAggregator, metricsConf); // Start the minute host aggregator TimelineMetricAggregator minuteHostAggregator = - TimelineMetricAggregatorFactory.createTimelineMetricAggregatorMinute(hBaseAccessor, metricsConf); - scheduleAggregatorThread(minuteHostAggregator); + TimelineMetricAggregatorFactory.createTimelineMetricAggregatorMinute( + hBaseAccessor, metricsConf, haController); + scheduleAggregatorThread(minuteHostAggregator, metricsConf); // Start the hourly host aggregator TimelineMetricAggregator hourlyHostAggregator = - TimelineMetricAggregatorFactory.createTimelineMetricAggregatorHourly(hBaseAccessor, metricsConf); - scheduleAggregatorThread(hourlyHostAggregator); + TimelineMetricAggregatorFactory.createTimelineMetricAggregatorHourly( + hBaseAccessor, metricsConf, haController); + scheduleAggregatorThread(hourlyHostAggregator, metricsConf); // Start the daily host aggregator TimelineMetricAggregator dailyHostAggregator = - TimelineMetricAggregatorFactory.createTimelineMetricAggregatorDaily(hBaseAccessor, metricsConf); - scheduleAggregatorThread(dailyHostAggregator); + TimelineMetricAggregatorFactory.createTimelineMetricAggregatorDaily( + hBaseAccessor, metricsConf, haController); + scheduleAggregatorThread(dailyHostAggregator, metricsConf); if (!configuration.isTimelineMetricsServiceWatcherDisabled()) { int initDelay = configuration.getTimelineMetricsServiceWatcherInitDelay(); int delay = configuration.getTimelineMetricsServiceWatcherDelay(); // Start the watchdog - executorService.scheduleWithFixedDelay( + watchdogExecutorService.scheduleWithFixedDelay( new TimelineMetricStoreWatcher(this, configuration), initDelay, delay, TimeUnit.SECONDS); LOG.info("Started watchdog for timeline metrics store with initial " + @@ -357,13 +382,31 @@ public class HBaseTimelineMetricStore extends AbstractService implements Timelin return metricMetadataManager.getHostedAppsCache(); } - private void scheduleAggregatorThread(TimelineMetricAggregator aggregator) { - ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor(); + @Override + public List<String> getLiveInstances() { + return haController.getLiveInstanceHostNames(); + } + + private void scheduleAggregatorThread(final TimelineMetricAggregator aggregator, + Configuration metricsConf) { if (!aggregator.isDisabled()) { + ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor( + new ThreadFactory() { + @Override + public Thread newThread(Runnable r) { + return new Thread(r, ACTUAL_AGGREGATOR_NAMES.get(aggregator.getName())); + } + } + ); + scheduledExecutors.put(aggregator.getName(), executorService); executorService.scheduleAtFixedRate(aggregator, - 0l, + SECONDS.toMillis(metricsConf.getInt(AGGREGATOR_CHECKPOINT_DELAY, 120)), aggregator.getSleepIntervalMillis(), TimeUnit.MILLISECONDS); + LOG.info("Scheduled aggregator thread " + aggregator.getName() + " every " + + + aggregator.getSleepIntervalMillis() + " milliseconds."); + } else { + LOG.info("Skipped scheduling " + aggregator.getName() + " since it is disabled."); } } } http://git-wip-us.apache.org/repos/asf/ambari/blob/14a4f970/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricConfiguration.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricConfiguration.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricConfiguration.java index 681b353..c4a3fee 100644 --- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricConfiguration.java +++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricConfiguration.java @@ -23,9 +23,11 @@ import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.conf.Configuration; +import java.net.InetAddress; import java.net.MalformedURLException; import java.net.URISyntaxException; import java.net.URL; +import java.net.UnknownHostException; /** * Configuration class that reads properties from ams-site.xml. All values @@ -38,6 +40,7 @@ public class TimelineMetricConfiguration { public static final String HBASE_SITE_CONFIGURATION_FILE = "hbase-site.xml"; public static final String METRICS_SITE_CONFIGURATION_FILE = "ams-site.xml"; + public static final String METRICS_ENV_CONFIGURATION_FILE = "ams-env.xml"; public static final String TIMELINE_METRICS_AGGREGATOR_CHECKPOINT_DIR = "timeline.metrics.aggregator.checkpoint.dir"; @@ -251,8 +254,11 @@ public class TimelineMetricConfiguration { public static final String HOST_APP_ID = "HOST"; + public static final String DEFAULT_INSTANCE_PORT = "12001"; + private Configuration hbaseConf; private Configuration metricsConf; + private Configuration amsEnvConf; private volatile boolean isInitialized = false; public void initialize() throws URISyntaxException, MalformedURLException { @@ -279,6 +285,7 @@ public class TimelineMetricConfiguration { hbaseConf.addResource(hbaseResUrl.toURI().toURL()); metricsConf = new Configuration(true); metricsConf.addResource(amsResUrl.toURI().toURL()); + isInitialized = true; } @@ -296,6 +303,37 @@ public class TimelineMetricConfiguration { return metricsConf; } + public String getZKClientPort() throws MalformedURLException, URISyntaxException { + if (!isInitialized) { + initialize(); + } + return hbaseConf.getTrimmed("hbase.zookeeper.property.clientPort", "2181"); + } + + public String getZKQuorum() throws MalformedURLException, URISyntaxException { + if (!isInitialized) { + initialize(); + } + return hbaseConf.getTrimmed("hbase.zookeeper.quorum"); + } + + public String getInstanceHostnameFromEnv() throws UnknownHostException { + String amsInstanceName = System.getProperty("AMS_INSTANCE_NAME"); + if (amsInstanceName == null) { + amsInstanceName = InetAddress.getLocalHost().getHostName(); + } + return amsInstanceName; + } + + public String getInstancePort() throws MalformedURLException, URISyntaxException { + String amsInstancePort = System.getProperty("AMS_INSTANCE_PORT"); + if (amsInstancePort == null) { + // Check config + return getMetricsConf().get("timeline.metrics.availability.instance.port", DEFAULT_INSTANCE_PORT); + } + return DEFAULT_INSTANCE_PORT; + } + public String getWebappAddress() { String defaultHttpAddress = "0.0.0.0:6188"; if (metricsConf != null) { @@ -353,4 +391,12 @@ public class TimelineMetricConfiguration { } return defaultRpcAddress; } + + public boolean isDistributedOperationModeEnabled() { + try { + return getMetricsConf().get("timeline.metrics.service.operation.mode").equals("distributed"); + } catch (Exception e) { + return false; + } + } } http://git-wip-us.apache.org/repos/asf/ambari/blob/14a4f970/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricStore.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricStore.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricStore.java index 3a8dc55..15644ed 100644 --- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricStore.java +++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricStore.java @@ -87,4 +87,10 @@ public interface TimelineMetricStore { * @throws IOException */ Map<String, Set<String>> getHostAppsMetadata() throws SQLException, IOException; + + /** + * Return a list of known live collector nodes + * @return [ hostname ] + */ + List<String> getLiveInstances(); } http://git-wip-us.apache.org/repos/asf/ambari/blob/14a4f970/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/AbstractTimelineAggregator.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/AbstractTimelineAggregator.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/AbstractTimelineAggregator.java index b1b1822..04f5c1c 100644 --- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/AbstractTimelineAggregator.java +++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/AbstractTimelineAggregator.java @@ -22,6 +22,9 @@ import org.apache.commons.io.FileUtils; import org.apache.commons.lang.StringUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixHBaseAccessor; +import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.AggregationTaskRunner; +import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.AggregationTaskRunner.AGGREGATOR_NAME; +import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.TimelineMetricHAController; import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.Condition; import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.EmptyCondition; import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL; @@ -40,6 +43,7 @@ import java.util.List; import static java.util.concurrent.TimeUnit.SECONDS; import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.AGGREGATOR_CHECKPOINT_DELAY; import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.RESULTSET_FETCH_SIZE; +import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.AggregationTaskRunner.ACTUAL_AGGREGATOR_NAMES; /** * Base class for all runnable aggregators. Provides common functions like @@ -58,13 +62,14 @@ public abstract class AbstractTimelineAggregator implements TimelineMetricAggreg protected String tableName; protected String outputTableName; protected Long nativeTimeRangeDelay; + protected AggregationTaskRunner taskRunner; protected List<String> downsampleMetricPatterns; protected List<CustomDownSampler> configuredDownSamplers; // Explicitly name aggregators for logging needs - private final String aggregatorName; + private final AGGREGATOR_NAME aggregatorName; - AbstractTimelineAggregator(String aggregatorName, + AbstractTimelineAggregator(AGGREGATOR_NAME aggregatorName, PhoenixHBaseAccessor hBaseAccessor, Configuration metricsConf) { this.aggregatorName = aggregatorName; @@ -72,12 +77,12 @@ public abstract class AbstractTimelineAggregator implements TimelineMetricAggreg this.metricsConf = metricsConf; this.checkpointDelayMillis = SECONDS.toMillis(metricsConf.getInt(AGGREGATOR_CHECKPOINT_DELAY, 120)); this.resultsetFetchSize = metricsConf.getInt(RESULTSET_FETCH_SIZE, 2000); - this.LOG = LoggerFactory.getLogger(aggregatorName); + this.LOG = LoggerFactory.getLogger(ACTUAL_AGGREGATOR_NAMES.get(aggregatorName)); this.configuredDownSamplers = DownSamplerUtils.getDownSamplers(metricsConf); this.downsampleMetricPatterns = DownSamplerUtils.getDownsampleMetricPatterns(metricsConf); } - public AbstractTimelineAggregator(String aggregatorName, + public AbstractTimelineAggregator(AGGREGATOR_NAME aggregatorName, PhoenixHBaseAccessor hBaseAccessor, Configuration metricsConf, String checkpointLocation, @@ -86,7 +91,8 @@ public abstract class AbstractTimelineAggregator implements TimelineMetricAggreg String aggregatorDisableParam, String tableName, String outputTableName, - Long nativeTimeRangeDelay) { + Long nativeTimeRangeDelay, + TimelineMetricHAController haController) { this(aggregatorName, hBaseAccessor, metricsConf); this.checkpointLocation = checkpointLocation; this.sleepIntervalMillis = sleepIntervalMillis; @@ -94,7 +100,9 @@ public abstract class AbstractTimelineAggregator implements TimelineMetricAggreg this.aggregatorDisableParam = aggregatorDisableParam; this.tableName = tableName; this.outputTableName = outputTableName; - this.nativeTimeRangeDelay = nativeTimeRangeDelay; + this.nativeTimeRangeDelay = nativeTimeRangeDelay; + this.taskRunner = haController != null && haController.isInitialized() ? + haController.getAggregationTaskRunner() : null; } @Override @@ -108,25 +116,39 @@ public abstract class AbstractTimelineAggregator implements TimelineMetricAggreg * Access relaxed for tests */ public void runOnce(Long SLEEP_INTERVAL) { + boolean performAggregationFunction = true; + if (taskRunner != null) { + switch (getAggregatorType()) { + case HOST: + performAggregationFunction = taskRunner.performsHostAggregation(); + break; + case CLUSTER: + performAggregationFunction = taskRunner.performsClusterAggregation(); + } + } - long currentTime = System.currentTimeMillis(); - long lastCheckPointTime = readLastCheckpointSavingOnFirstRun(currentTime); - - if (lastCheckPointTime != -1) { - LOG.info("Last check point time: " + lastCheckPointTime + ", lagBy: " - + ((currentTime - lastCheckPointTime) / 1000) - + " seconds."); - - boolean success = doWork(lastCheckPointTime, lastCheckPointTime + SLEEP_INTERVAL); + if (performAggregationFunction) { + long currentTime = System.currentTimeMillis(); + long lastCheckPointTime = readLastCheckpointSavingOnFirstRun(currentTime); - if (success) { - try { - saveCheckPoint(lastCheckPointTime + SLEEP_INTERVAL); - } catch (IOException io) { - LOG.warn("Error saving checkpoint, restarting aggregation at " + - "previous checkpoint."); + if (lastCheckPointTime != -1) { + LOG.info("Last check point time: " + lastCheckPointTime + ", lagBy: " + + ((currentTime - lastCheckPointTime) / 1000) + + " seconds."); + + boolean success = doWork(lastCheckPointTime, lastCheckPointTime + SLEEP_INTERVAL); + + if (success) { + try { + saveCheckPoint(lastCheckPointTime + SLEEP_INTERVAL); + } catch (IOException io) { + LOG.warn("Error saving checkpoint, restarting aggregation at " + + "previous checkpoint."); + } } } + } else { + LOG.info("Skipping aggregation function not owned by this instance."); } } @@ -184,6 +206,9 @@ public abstract class AbstractTimelineAggregator implements TimelineMetricAggreg } protected long readCheckPoint() { + if (taskRunner != null) { + return taskRunner.getCheckpointManager().readCheckpoint(aggregatorName); + } try { File checkpoint = new File(getCheckpointLocation()); if (checkpoint.exists()) { @@ -199,15 +224,23 @@ public abstract class AbstractTimelineAggregator implements TimelineMetricAggreg } protected void saveCheckPoint(long checkpointTime) throws IOException { - File checkpoint = new File(getCheckpointLocation()); - if (!checkpoint.exists()) { - boolean done = checkpoint.createNewFile(); - if (!done) { - throw new IOException("Could not create checkpoint at location, " + - getCheckpointLocation()); + if (taskRunner != null) { + boolean success = taskRunner.getCheckpointManager().writeCheckpoint(aggregatorName, checkpointTime); + if (!success) { + LOG.error("Error saving checkpoint with AggregationTaskRunner, " + + "aggregator = " + aggregatorName + "value = " + checkpointTime); } + } else { + File checkpoint = new File(getCheckpointLocation()); + if (!checkpoint.exists()) { + boolean done = checkpoint.createNewFile(); + if (!done) { + throw new IOException("Could not create checkpoint at location, " + + getCheckpointLocation()); + } + } + FileUtils.writeStringToFile(checkpoint, String.valueOf(checkpointTime)); } - FileUtils.writeStringToFile(checkpoint, String.valueOf(checkpointTime)); } /** @@ -365,6 +398,24 @@ public abstract class AbstractTimelineAggregator implements TimelineMetricAggreg } /** + * Get @AGGREGATOR_TYPE based on the output table. + * This is solely used by the HAController to determine which lock to acquire. + */ + public AGGREGATOR_TYPE getAggregatorType() { + if (outputTableName.contains("RECORD")) { + return AGGREGATOR_TYPE.HOST; + } else if (outputTableName.contains("AGGREGATE")) { + return AGGREGATOR_TYPE.CLUSTER; + } + return null; + } + + @Override + public AGGREGATOR_NAME getName() { + return aggregatorName; + } + + /** * Run 1 downsampler query. * @param conn * @param condition http://git-wip-us.apache.org/repos/asf/ambari/blob/14a4f970/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricAggregator.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricAggregator.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricAggregator.java index 295db0e..150e3f1 100644 --- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricAggregator.java +++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricAggregator.java @@ -1,5 +1,7 @@ package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators; +import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.AggregationTaskRunner.AGGREGATOR_NAME; + /** * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file @@ -20,22 +22,38 @@ package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline public interface TimelineMetricAggregator extends Runnable { /** * Aggregate metric data within the time bounds. + * * @param startTime start time millis - * @param endTime end time millis + * @param endTime end time millis * @return success */ - public boolean doWork(long startTime, long endTime); + boolean doWork(long startTime, long endTime); /** * Is aggregator is disabled by configuration. + * * @return true/false */ - public boolean isDisabled(); + boolean isDisabled(); /** * Return aggregator Interval + * * @return Interval in Millis */ - public Long getSleepIntervalMillis(); + Long getSleepIntervalMillis(); + + /** + * Get aggregator name + * @return @AGGREGATOR_NAME + */ + AGGREGATOR_NAME getName(); + /** + * Known aggregator types + */ + enum AGGREGATOR_TYPE { + CLUSTER, + HOST } +} http://git-wip-us.apache.org/repos/asf/ambari/blob/14a4f970/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricAggregatorFactory.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricAggregatorFactory.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricAggregatorFactory.java index cc85c56..4c44f9e 100644 --- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricAggregatorFactory.java +++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricAggregatorFactory.java @@ -20,6 +20,7 @@ package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline import org.apache.commons.io.FilenameUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixHBaseAccessor; +import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.TimelineMetricHAController; import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.discovery.TimelineMetricMetadataManager; import static java.util.concurrent.TimeUnit.SECONDS; @@ -29,12 +30,12 @@ import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.ti import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.CLUSTER_AGGREGATOR_HOUR_CHECKPOINT_CUTOFF_MULTIPLIER; import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.CLUSTER_AGGREGATOR_HOUR_DISABLED; import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.CLUSTER_AGGREGATOR_HOUR_SLEEP_INTERVAL; -import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.CLUSTER_AGGREGATOR_SECOND_CHECKPOINT_CUTOFF_MULTIPLIER; import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.CLUSTER_AGGREGATOR_MINUTE_CHECKPOINT_CUTOFF_MULTIPLIER; -import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.CLUSTER_AGGREGATOR_SECOND_DISABLED; import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.CLUSTER_AGGREGATOR_MINUTE_DISABLED; -import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.CLUSTER_AGGREGATOR_SECOND_SLEEP_INTERVAL; import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.CLUSTER_AGGREGATOR_MINUTE_SLEEP_INTERVAL; +import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.CLUSTER_AGGREGATOR_SECOND_CHECKPOINT_CUTOFF_MULTIPLIER; +import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.CLUSTER_AGGREGATOR_SECOND_DISABLED; +import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.CLUSTER_AGGREGATOR_SECOND_SLEEP_INTERVAL; import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.CLUSTER_AGGREGATOR_TIMESLICE_INTERVAL; import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.DEFAULT_CHECKPOINT_LOCATION; import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.HOST_AGGREGATOR_DAILY_CHECKPOINT_CUTOFF_MULTIPLIER; @@ -48,6 +49,13 @@ import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.ti import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.HOST_AGGREGATOR_MINUTE_SLEEP_INTERVAL; import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.TIMELINE_METRICS_AGGREGATOR_CHECKPOINT_DIR; import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.USE_GROUPBY_AGGREGATOR_QUERIES; +import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.AggregationTaskRunner.AGGREGATOR_NAME.METRIC_AGGREGATE_DAILY; +import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.AggregationTaskRunner.AGGREGATOR_NAME.METRIC_AGGREGATE_HOURLY; +import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.AggregationTaskRunner.AGGREGATOR_NAME.METRIC_AGGREGATE_MINUTE; +import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.AggregationTaskRunner.AGGREGATOR_NAME.METRIC_AGGREGATE_SECOND; +import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.AggregationTaskRunner.AGGREGATOR_NAME.METRIC_RECORD_DAILY; +import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.AggregationTaskRunner.AGGREGATOR_NAME.METRIC_RECORD_HOURLY; +import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.AggregationTaskRunner.AGGREGATOR_NAME.METRIC_RECORD_MINUTE; import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.METRICS_AGGREGATE_DAILY_TABLE_NAME; import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.METRICS_AGGREGATE_HOURLY_TABLE_NAME; import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.METRICS_AGGREGATE_MINUTE_TABLE_NAME; @@ -86,7 +94,8 @@ public class TimelineMetricAggregatorFactory { * Interval : 5 mins */ public static TimelineMetricAggregator createTimelineMetricAggregatorMinute - (PhoenixHBaseAccessor hBaseAccessor, Configuration metricsConf) { + (PhoenixHBaseAccessor hBaseAccessor, Configuration metricsConf, + TimelineMetricHAController haController) { String checkpointDir = metricsConf.get( TIMELINE_METRICS_AGGREGATOR_CHECKPOINT_DIR, DEFAULT_CHECKPOINT_LOCATION); @@ -104,7 +113,7 @@ public class TimelineMetricAggregatorFactory { if (useGroupByAggregator(metricsConf)) { return new org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.v2.TimelineMetricHostAggregator( - "TimelineMetricHostAggregatorMinute", + METRIC_RECORD_MINUTE, hBaseAccessor, metricsConf, checkpointLocation, sleepIntervalMillis, @@ -112,12 +121,13 @@ public class TimelineMetricAggregatorFactory { hostAggregatorDisabledParam, inputTableName, outputTableName, - 120000l + 120000l, + haController ); } return new TimelineMetricHostAggregator( - "TimelineMetricHostAggregatorMinute", + METRIC_RECORD_MINUTE, hBaseAccessor, metricsConf, checkpointLocation, sleepIntervalMillis, @@ -125,7 +135,8 @@ public class TimelineMetricAggregatorFactory { hostAggregatorDisabledParam, inputTableName, outputTableName, - 120000l); + 120000l, + haController); } /** @@ -133,7 +144,8 @@ public class TimelineMetricAggregatorFactory { * Interval : 1 hour */ public static TimelineMetricAggregator createTimelineMetricAggregatorHourly - (PhoenixHBaseAccessor hBaseAccessor, Configuration metricsConf) { + (PhoenixHBaseAccessor hBaseAccessor, Configuration metricsConf, + TimelineMetricHAController haController) { String checkpointDir = metricsConf.get( TIMELINE_METRICS_AGGREGATOR_CHECKPOINT_DIR, DEFAULT_CHECKPOINT_LOCATION); @@ -151,7 +163,7 @@ public class TimelineMetricAggregatorFactory { if (useGroupByAggregator(metricsConf)) { return new org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.v2.TimelineMetricHostAggregator( - "TimelineMetricHostAggregatorHourly", + METRIC_RECORD_HOURLY, hBaseAccessor, metricsConf, checkpointLocation, sleepIntervalMillis, @@ -159,12 +171,13 @@ public class TimelineMetricAggregatorFactory { hostAggregatorDisabledParam, inputTableName, outputTableName, - 3600000l + 3600000l, + haController ); } return new TimelineMetricHostAggregator( - "TimelineMetricHostAggregatorHourly", + METRIC_RECORD_HOURLY, hBaseAccessor, metricsConf, checkpointLocation, sleepIntervalMillis, @@ -172,7 +185,8 @@ public class TimelineMetricAggregatorFactory { hostAggregatorDisabledParam, inputTableName, outputTableName, - 3600000l); + 3600000l, + haController); } /** @@ -180,7 +194,8 @@ public class TimelineMetricAggregatorFactory { * Interval : 1 day */ public static TimelineMetricAggregator createTimelineMetricAggregatorDaily - (PhoenixHBaseAccessor hBaseAccessor, Configuration metricsConf) { + (PhoenixHBaseAccessor hBaseAccessor, Configuration metricsConf, + TimelineMetricHAController haController) { String checkpointDir = metricsConf.get( TIMELINE_METRICS_AGGREGATOR_CHECKPOINT_DIR, DEFAULT_CHECKPOINT_LOCATION); @@ -198,7 +213,7 @@ public class TimelineMetricAggregatorFactory { if (useGroupByAggregator(metricsConf)) { return new org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.v2.TimelineMetricHostAggregator( - "TimelineMetricHostAggregatorDaily", + METRIC_RECORD_DAILY, hBaseAccessor, metricsConf, checkpointLocation, sleepIntervalMillis, @@ -206,12 +221,13 @@ public class TimelineMetricAggregatorFactory { hostAggregatorDisabledParam, inputTableName, outputTableName, - 3600000l + 3600000l, + haController ); } return new TimelineMetricHostAggregator( - "TimelineMetricHostAggregatorDaily", + METRIC_RECORD_DAILY, hBaseAccessor, metricsConf, checkpointLocation, sleepIntervalMillis, @@ -219,7 +235,8 @@ public class TimelineMetricAggregatorFactory { hostAggregatorDisabledParam, inputTableName, outputTableName, - 3600000l); + 3600000l, + haController); } /** @@ -229,7 +246,8 @@ public class TimelineMetricAggregatorFactory { */ public static TimelineMetricAggregator createTimelineClusterAggregatorSecond( PhoenixHBaseAccessor hBaseAccessor, Configuration metricsConf, - TimelineMetricMetadataManager metadataManager) { + TimelineMetricMetadataManager metadataManager, + TimelineMetricHAController haController) { String checkpointDir = metricsConf.get( TIMELINE_METRICS_AGGREGATOR_CHECKPOINT_DIR, DEFAULT_CHECKPOINT_LOCATION); @@ -252,7 +270,7 @@ public class TimelineMetricAggregatorFactory { // Second based aggregation have added responsibility of time slicing return new TimelineMetricClusterAggregatorSecond( - "TimelineClusterAggregatorSecond", + METRIC_AGGREGATE_SECOND, metadataManager, hBaseAccessor, metricsConf, checkpointLocation, @@ -262,7 +280,8 @@ public class TimelineMetricAggregatorFactory { inputTableName, outputTableName, 120000l, - timeSliceIntervalMillis + timeSliceIntervalMillis, + haController ); } @@ -271,7 +290,8 @@ public class TimelineMetricAggregatorFactory { * Interval : 5 mins */ public static TimelineMetricAggregator createTimelineClusterAggregatorMinute( - PhoenixHBaseAccessor hBaseAccessor, Configuration metricsConf) { + PhoenixHBaseAccessor hBaseAccessor, Configuration metricsConf, + TimelineMetricHAController haController) { String checkpointDir = metricsConf.get( TIMELINE_METRICS_AGGREGATOR_CHECKPOINT_DIR, DEFAULT_CHECKPOINT_LOCATION); @@ -291,7 +311,7 @@ public class TimelineMetricAggregatorFactory { if (useGroupByAggregator(metricsConf)) { return new org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.v2.TimelineMetricClusterAggregator( - "TimelineClusterAggregatorMinute", + METRIC_AGGREGATE_MINUTE, hBaseAccessor, metricsConf, checkpointLocation, sleepIntervalMillis, @@ -299,12 +319,13 @@ public class TimelineMetricAggregatorFactory { aggregatorDisabledParam, inputTableName, outputTableName, - 120000l + 120000l, + haController ); } return new TimelineMetricClusterAggregator( - "TimelineClusterAggregatorMinute", + METRIC_AGGREGATE_MINUTE, hBaseAccessor, metricsConf, checkpointLocation, sleepIntervalMillis, @@ -312,7 +333,8 @@ public class TimelineMetricAggregatorFactory { aggregatorDisabledParam, inputTableName, outputTableName, - 120000l + 120000l, + haController ); } @@ -321,7 +343,8 @@ public class TimelineMetricAggregatorFactory { * Interval : 1 hour */ public static TimelineMetricAggregator createTimelineClusterAggregatorHourly( - PhoenixHBaseAccessor hBaseAccessor, Configuration metricsConf) { + PhoenixHBaseAccessor hBaseAccessor, Configuration metricsConf, + TimelineMetricHAController haController) { String checkpointDir = metricsConf.get( TIMELINE_METRICS_AGGREGATOR_CHECKPOINT_DIR, DEFAULT_CHECKPOINT_LOCATION); @@ -341,7 +364,7 @@ public class TimelineMetricAggregatorFactory { if (useGroupByAggregator(metricsConf)) { return new org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.v2.TimelineMetricClusterAggregator( - "TimelineClusterAggregatorHourly", + METRIC_AGGREGATE_HOURLY, hBaseAccessor, metricsConf, checkpointLocation, sleepIntervalMillis, @@ -349,12 +372,13 @@ public class TimelineMetricAggregatorFactory { aggregatorDisabledParam, inputTableName, outputTableName, - 120000l + 120000l, + haController ); } return new TimelineMetricClusterAggregator( - "TimelineClusterAggregatorHourly", + METRIC_AGGREGATE_HOURLY, hBaseAccessor, metricsConf, checkpointLocation, sleepIntervalMillis, @@ -362,7 +386,8 @@ public class TimelineMetricAggregatorFactory { aggregatorDisabledParam, inputTableName, outputTableName, - 120000l + 120000l, + haController ); } @@ -371,7 +396,8 @@ public class TimelineMetricAggregatorFactory { * Interval : 1 day */ public static TimelineMetricAggregator createTimelineClusterAggregatorDaily( - PhoenixHBaseAccessor hBaseAccessor, Configuration metricsConf) { + PhoenixHBaseAccessor hBaseAccessor, Configuration metricsConf, + TimelineMetricHAController haController) { String checkpointDir = metricsConf.get( TIMELINE_METRICS_AGGREGATOR_CHECKPOINT_DIR, DEFAULT_CHECKPOINT_LOCATION); @@ -391,7 +417,7 @@ public class TimelineMetricAggregatorFactory { if (useGroupByAggregator(metricsConf)) { return new org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.v2.TimelineMetricClusterAggregator( - "TimelineClusterAggregatorDaily", + METRIC_AGGREGATE_DAILY, hBaseAccessor, metricsConf, checkpointLocation, sleepIntervalMillis, @@ -399,12 +425,13 @@ public class TimelineMetricAggregatorFactory { aggregatorDisabledParam, inputTableName, outputTableName, - 120000l + 120000l, + haController ); } return new TimelineMetricClusterAggregator( - "TimelineClusterAggregatorDaily", + METRIC_AGGREGATE_DAILY, hBaseAccessor, metricsConf, checkpointLocation, sleepIntervalMillis, @@ -412,7 +439,8 @@ public class TimelineMetricAggregatorFactory { aggregatorDisabledParam, inputTableName, outputTableName, - 120000l + 120000l, + haController ); } } http://git-wip-us.apache.org/repos/asf/ambari/blob/14a4f970/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricClusterAggregator.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricClusterAggregator.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricClusterAggregator.java index f90b01f..6438256 100644 --- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricClusterAggregator.java +++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricClusterAggregator.java @@ -19,6 +19,8 @@ package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixHBaseAccessor; +import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.AggregationTaskRunner.AGGREGATOR_NAME; +import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.TimelineMetricHAController; import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.Condition; import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.DefaultCondition; @@ -36,7 +38,7 @@ public class TimelineMetricClusterAggregator extends AbstractTimelineAggregator private final TimelineMetricReadHelper readHelper = new TimelineMetricReadHelper(true); private final boolean isClusterPrecisionInputTable; - public TimelineMetricClusterAggregator(String aggregatorName, + public TimelineMetricClusterAggregator(AGGREGATOR_NAME aggregatorName, PhoenixHBaseAccessor hBaseAccessor, Configuration metricsConf, String checkpointLocation, @@ -45,11 +47,12 @@ public class TimelineMetricClusterAggregator extends AbstractTimelineAggregator String hostAggregatorDisabledParam, String inputTableName, String outputTableName, - Long nativeTimeRangeDelay) { + Long nativeTimeRangeDelay, + TimelineMetricHAController haController) { super(aggregatorName, hBaseAccessor, metricsConf, checkpointLocation, sleepIntervalMillis, checkpointCutOffMultiplier, hostAggregatorDisabledParam, inputTableName, outputTableName, - nativeTimeRangeDelay); + nativeTimeRangeDelay, haController); isClusterPrecisionInputTable = inputTableName.equals(METRICS_CLUSTER_AGGREGATE_TABLE_NAME); } http://git-wip-us.apache.org/repos/asf/ambari/blob/14a4f970/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricClusterAggregatorSecond.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricClusterAggregatorSecond.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricClusterAggregatorSecond.java index b338a70..98b3987 100644 --- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricClusterAggregatorSecond.java +++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricClusterAggregatorSecond.java @@ -25,6 +25,8 @@ import org.apache.hadoop.metrics2.sink.timeline.PostProcessingUtil; import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric; import org.apache.hadoop.metrics2.sink.timeline.TimelineMetricMetadata; import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixHBaseAccessor; +import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.AggregationTaskRunner; +import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.TimelineMetricHAController; import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.discovery.TimelineMetricMetadataKey; import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.discovery.TimelineMetricMetadataManager; import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.Condition; @@ -62,7 +64,7 @@ public class TimelineMetricClusterAggregatorSecond extends AbstractTimelineAggre private String skipAggrPatternStrings; - public TimelineMetricClusterAggregatorSecond(String aggregatorName, + public TimelineMetricClusterAggregatorSecond(AggregationTaskRunner.AGGREGATOR_NAME aggregatorName, TimelineMetricMetadataManager metadataManager, PhoenixHBaseAccessor hBaseAccessor, Configuration metricsConf, @@ -73,10 +75,11 @@ public class TimelineMetricClusterAggregatorSecond extends AbstractTimelineAggre String tableName, String outputTableName, Long nativeTimeRangeDelay, - Long timeSliceInterval) { + Long timeSliceInterval, + TimelineMetricHAController haController) { super(aggregatorName, hBaseAccessor, metricsConf, checkpointLocation, sleepIntervalMillis, checkpointCutOffMultiplier, aggregatorDisabledParam, - tableName, outputTableName, nativeTimeRangeDelay); + tableName, outputTableName, nativeTimeRangeDelay, haController); this.metadataManagerInstance = metadataManager; appAggregator = new TimelineMetricAppAggregator(metadataManager, metricsConf); http://git-wip-us.apache.org/repos/asf/ambari/blob/14a4f970/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricHostAggregator.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricHostAggregator.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricHostAggregator.java index 26e73b0..364a4b5 100644 --- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricHostAggregator.java +++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricHostAggregator.java @@ -22,20 +22,24 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric; import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixHBaseAccessor; +import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.AggregationTaskRunner.AGGREGATOR_NAME; +import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.TimelineMetricHAController; import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.Condition; import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.DefaultCondition; + import java.io.IOException; import java.sql.ResultSet; import java.sql.SQLException; import java.util.HashMap; import java.util.Map; + import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.GET_METRIC_AGGREGATE_ONLY_SQL; public class TimelineMetricHostAggregator extends AbstractTimelineAggregator { private static final Log LOG = LogFactory.getLog(TimelineMetricHostAggregator.class); TimelineMetricReadHelper readHelper = new TimelineMetricReadHelper(false); - public TimelineMetricHostAggregator(String aggregatorName, + public TimelineMetricHostAggregator(AGGREGATOR_NAME aggregatorName, PhoenixHBaseAccessor hBaseAccessor, Configuration metricsConf, String checkpointLocation, @@ -44,10 +48,11 @@ public class TimelineMetricHostAggregator extends AbstractTimelineAggregator { String hostAggregatorDisabledParam, String tableName, String outputTableName, - Long nativeTimeRangeDelay) { + Long nativeTimeRangeDelay, + TimelineMetricHAController haController) { super(aggregatorName, hBaseAccessor, metricsConf, checkpointLocation, sleepIntervalMillis, checkpointCutOffMultiplier, hostAggregatorDisabledParam, - tableName, outputTableName, nativeTimeRangeDelay); + tableName, outputTableName, nativeTimeRangeDelay, haController); } @Override http://git-wip-us.apache.org/repos/asf/ambari/blob/14a4f970/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/v2/TimelineMetricClusterAggregator.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/v2/TimelineMetricClusterAggregator.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/v2/TimelineMetricClusterAggregator.java index 1e4b4ad..bbb9991 100644 --- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/v2/TimelineMetricClusterAggregator.java +++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/v2/TimelineMetricClusterAggregator.java @@ -20,19 +20,23 @@ package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixHBaseAccessor; import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.AbstractTimelineAggregator; +import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.AggregationTaskRunner.AGGREGATOR_NAME; +import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.TimelineMetricHAController; import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.Condition; import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.EmptyCondition; + import java.io.IOException; import java.sql.ResultSet; import java.sql.SQLException; import java.util.Date; + import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.GET_AGGREGATED_APP_METRIC_GROUPBY_SQL; import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.METRICS_CLUSTER_AGGREGATE_TABLE_NAME; public class TimelineMetricClusterAggregator extends AbstractTimelineAggregator { private final String aggregateColumnName; - public TimelineMetricClusterAggregator(String aggregatorName, + public TimelineMetricClusterAggregator(AGGREGATOR_NAME aggregatorName, PhoenixHBaseAccessor hBaseAccessor, Configuration metricsConf, String checkpointLocation, @@ -41,11 +45,12 @@ public class TimelineMetricClusterAggregator extends AbstractTimelineAggregator String hostAggregatorDisabledParam, String inputTableName, String outputTableName, - Long nativeTimeRangeDelay) { + Long nativeTimeRangeDelay, + TimelineMetricHAController haController) { super(aggregatorName, hBaseAccessor, metricsConf, checkpointLocation, sleepIntervalMillis, checkpointCutOffMultiplier, hostAggregatorDisabledParam, inputTableName, outputTableName, - nativeTimeRangeDelay); + nativeTimeRangeDelay, haController); if (inputTableName.equals(METRICS_CLUSTER_AGGREGATE_TABLE_NAME)) { aggregateColumnName = "HOSTS_COUNT"; http://git-wip-us.apache.org/repos/asf/ambari/blob/14a4f970/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/v2/TimelineMetricHostAggregator.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/v2/TimelineMetricHostAggregator.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/v2/TimelineMetricHostAggregator.java index 59d5097..c071708 100644 --- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/v2/TimelineMetricHostAggregator.java +++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/v2/TimelineMetricHostAggregator.java @@ -20,8 +20,11 @@ package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixHBaseAccessor; import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.AbstractTimelineAggregator; +import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.AggregationTaskRunner.AGGREGATOR_NAME; +import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.TimelineMetricHAController; import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.Condition; import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.EmptyCondition; + import java.io.IOException; import java.sql.ResultSet; import java.sql.SQLException; @@ -31,7 +34,7 @@ import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.ti public class TimelineMetricHostAggregator extends AbstractTimelineAggregator { - public TimelineMetricHostAggregator(String aggregatorName, + public TimelineMetricHostAggregator(AGGREGATOR_NAME aggregatorName, PhoenixHBaseAccessor hBaseAccessor, Configuration metricsConf, String checkpointLocation, @@ -40,10 +43,11 @@ public class TimelineMetricHostAggregator extends AbstractTimelineAggregator { String hostAggregatorDisabledParam, String tableName, String outputTableName, - Long nativeTimeRangeDelay) { + Long nativeTimeRangeDelay, + TimelineMetricHAController haController) { super(aggregatorName, hBaseAccessor, metricsConf, checkpointLocation, sleepIntervalMillis, checkpointCutOffMultiplier, hostAggregatorDisabledParam, - tableName, outputTableName, nativeTimeRangeDelay); + tableName, outputTableName, nativeTimeRangeDelay, haController); } @Override http://git-wip-us.apache.org/repos/asf/ambari/blob/14a4f970/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/availability/AggregationTaskRunner.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/availability/AggregationTaskRunner.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/availability/AggregationTaskRunner.java new file mode 100644 index 0000000..fcd26bd --- /dev/null +++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/availability/AggregationTaskRunner.java @@ -0,0 +1,144 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.TimelineMetricAggregator.AGGREGATOR_TYPE; +import org.apache.helix.HelixManager; +import org.apache.helix.HelixManagerFactory; +import org.apache.helix.InstanceType; +import org.apache.helix.participant.StateMachineEngine; + +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.atomic.AtomicBoolean; + +import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.TimelineMetricAggregator.AGGREGATOR_TYPE.CLUSTER; +import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.TimelineMetricAggregator.AGGREGATOR_TYPE.HOST; +import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.AggregationTaskRunner.AGGREGATOR_NAME.METRIC_AGGREGATE_DAILY; +import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.AggregationTaskRunner.AGGREGATOR_NAME.METRIC_AGGREGATE_HOURLY; +import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.AggregationTaskRunner.AGGREGATOR_NAME.METRIC_AGGREGATE_MINUTE; +import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.AggregationTaskRunner.AGGREGATOR_NAME.METRIC_AGGREGATE_SECOND; +import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.AggregationTaskRunner.AGGREGATOR_NAME.METRIC_RECORD_DAILY; +import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.AggregationTaskRunner.AGGREGATOR_NAME.METRIC_RECORD_HOURLY; +import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.AggregationTaskRunner.AGGREGATOR_NAME.METRIC_RECORD_MINUTE; +import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.TimelineMetricHAController.CLUSTER_NAME; +import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.TimelineMetricHAController.METRIC_AGGREGATORS; +import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.TimelineMetricHAController.STATE_MODEL_NAME; + +public class AggregationTaskRunner { + private final String instanceName; + private final String zkAddress; + private HelixManager manager; + private static final Log LOG = LogFactory.getLog(AggregationTaskRunner.class); + private CheckpointManager checkpointManager; + // Map partition name to an aggregator dimension + static final Map<String, AGGREGATOR_TYPE> PARTITION_AGGREGATION_TYPES = new HashMap<>(); + // Ownership flags to be set by the State transitions + private AtomicBoolean performsClusterAggregation = new AtomicBoolean(false); + private AtomicBoolean performsHostAggregation = new AtomicBoolean(false); + + public enum AGGREGATOR_NAME { + METRIC_RECORD_MINUTE, + METRIC_RECORD_HOURLY, + METRIC_RECORD_DAILY, + METRIC_AGGREGATE_SECOND, + METRIC_AGGREGATE_MINUTE, + METRIC_AGGREGATE_HOURLY, + METRIC_AGGREGATE_DAILY, + } + + public static final Map<AGGREGATOR_NAME, String> ACTUAL_AGGREGATOR_NAMES = new HashMap<>(); + + static { + ACTUAL_AGGREGATOR_NAMES.put(METRIC_RECORD_MINUTE, "TimelineMetricHostAggregatorMinute"); + ACTUAL_AGGREGATOR_NAMES.put(METRIC_RECORD_HOURLY, "TimelineMetricHostAggregatorHourly"); + ACTUAL_AGGREGATOR_NAMES.put(METRIC_RECORD_DAILY, "TimelineMetricHostAggregatorDaily"); + ACTUAL_AGGREGATOR_NAMES.put(METRIC_AGGREGATE_SECOND, "TimelineClusterAggregatorSecond"); + ACTUAL_AGGREGATOR_NAMES.put(METRIC_AGGREGATE_MINUTE, "TimelineClusterAggregatorMinute"); + ACTUAL_AGGREGATOR_NAMES.put(METRIC_AGGREGATE_HOURLY, "TimelineClusterAggregatorHourly"); + ACTUAL_AGGREGATOR_NAMES.put(METRIC_AGGREGATE_DAILY, "TimelineClusterAggregatorDaily"); + + // Partition name to task assignment + PARTITION_AGGREGATION_TYPES.put(METRIC_AGGREGATORS + "_0", CLUSTER); + PARTITION_AGGREGATION_TYPES.put(METRIC_AGGREGATORS + "_1", HOST); + } + + public AggregationTaskRunner(String instanceName, String zkAddress) { + this.instanceName = instanceName; + this.zkAddress = zkAddress; + } + + public void initialize() throws Exception { + manager = HelixManagerFactory.getZKHelixManager(CLUSTER_NAME, instanceName, + InstanceType.PARTICIPANT, zkAddress); + + OnlineOfflineStateModelFactory stateModelFactory = + new OnlineOfflineStateModelFactory(instanceName, this); + + StateMachineEngine stateMach = manager.getStateMachineEngine(); + stateMach.registerStateModelFactory(STATE_MODEL_NAME, stateModelFactory); + manager.connect(); + + checkpointManager = new CheckpointManager(manager.getHelixPropertyStore()); + } + + public boolean performsClusterAggregation() { + return performsClusterAggregation.get(); + } + + public boolean performsHostAggregation() { + return performsHostAggregation.get(); + } + + public CheckpointManager getCheckpointManager() { + return checkpointManager; + } + + public void setPartitionAggregationFunction(AGGREGATOR_TYPE type) { + switch (type) { + case HOST: + performsHostAggregation.set(true); + LOG.info("Set host aggregator function for : " + instanceName); + break; + case CLUSTER: + performsClusterAggregation.set(true); + LOG.info("Set cluster aggregator function for : " + instanceName); + } + } + + public void unsetPartitionAggregationFunction(AGGREGATOR_TYPE type) { + switch (type) { + case HOST: + performsHostAggregation.set(false); + LOG.info("Unset host aggregator function for : " + instanceName); + break; + case CLUSTER: + performsClusterAggregation.set(false); + LOG.info("Unset cluster aggregator function for : " + instanceName); + } + } + + /** + * Disconnect participant before controller shutdown + */ + void stop() { + manager.disconnect(); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ambari/blob/14a4f970/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/availability/CheckpointManager.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/availability/CheckpointManager.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/availability/CheckpointManager.java new file mode 100644 index 0000000..439102f --- /dev/null +++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/availability/CheckpointManager.java @@ -0,0 +1,98 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability; + +import org.I0Itec.zkclient.DataUpdater; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.AggregationTaskRunner.AGGREGATOR_NAME; +import org.apache.helix.AccessOption; +import org.apache.helix.ZNRecord; +import org.apache.helix.store.zk.ZkHelixPropertyStore; +import org.apache.zookeeper.data.Stat; + +import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.AggregationTaskRunner.ACTUAL_AGGREGATOR_NAMES; + +public class CheckpointManager { + private final ZkHelixPropertyStore<ZNRecord> propertyStore; + private static final Log LOG = LogFactory.getLog(CheckpointManager.class); + + static final String ZNODE_FIELD = "checkpoint"; + static final String CHECKPOINT_PATH_PREFIX = "CHECKPOINTS"; + + public CheckpointManager(ZkHelixPropertyStore<ZNRecord> propertyStore) { + this.propertyStore = propertyStore; + } + + /** + * Read aggregator checkpoint from zookeeper + * + * @return timestamp + */ + public long readCheckpoint(AGGREGATOR_NAME aggregatorName) { + String path = getCheckpointZKPath(aggregatorName); + LOG.debug("Reading checkpoint at " + path); + Stat stat = new Stat(); + ZNRecord znRecord = propertyStore.get(path, stat, AccessOption.PERSISTENT); + if (LOG.isTraceEnabled()) { + LOG.trace("Stat => " + stat); + } + long checkpoint = znRecord != null ? znRecord.getLongField(ZNODE_FIELD, -1) : -1; + LOG.debug("Checkpoint value = " + checkpoint); + return checkpoint; + } + + /** + * Write aggregator checkpoint in zookeeper + * + * @param value timestamp + * @return sucsess + */ + public boolean writeCheckpoint(AGGREGATOR_NAME aggregatorName, long value) { + String path = getCheckpointZKPath(aggregatorName); + LOG.debug(String.format("Saving checkpoint at %s with value %s", path, value)); + return propertyStore.update(path, new CheckpointDataUpdater(path, value), AccessOption.PERSISTENT); + } + + static class CheckpointDataUpdater implements DataUpdater<ZNRecord> { + final String path; + final Long value; + + public CheckpointDataUpdater(String path, Long value) { + this.path = path; + this.value = value; + } + + @Override + public ZNRecord update(ZNRecord currentData) { + if (currentData == null) { + currentData = new ZNRecord(path); + } + currentData.setLongField(ZNODE_FIELD, value); + return currentData; + } + } + + String getCheckpointZKPath(AGGREGATOR_NAME aggregatorName) { + StringBuilder sb = new StringBuilder("/"); + sb.append(CHECKPOINT_PATH_PREFIX); + sb.append("/"); + sb.append(ACTUAL_AGGREGATOR_NAMES.get(aggregatorName)); + return sb.toString(); + } +} http://git-wip-us.apache.org/repos/asf/ambari/blob/14a4f970/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/availability/OnlineOfflineStateModelFactory.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/availability/OnlineOfflineStateModelFactory.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/availability/OnlineOfflineStateModelFactory.java new file mode 100644 index 0000000..7d3350b --- /dev/null +++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/availability/OnlineOfflineStateModelFactory.java @@ -0,0 +1,69 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.TimelineMetricAggregator.AGGREGATOR_TYPE; +import org.apache.helix.NotificationContext; +import org.apache.helix.model.Message; +import org.apache.helix.participant.statemachine.StateModel; +import org.apache.helix.participant.statemachine.StateModelFactory; + +import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.AggregationTaskRunner.PARTITION_AGGREGATION_TYPES; + +public class OnlineOfflineStateModelFactory extends StateModelFactory<StateModel> { + private static final Log LOG = LogFactory.getLog(OnlineOfflineStateModelFactory.class); + private final String instanceName; + private final AggregationTaskRunner taskRunner; + + public OnlineOfflineStateModelFactory(String instanceName, AggregationTaskRunner taskRunner) { + this.instanceName = instanceName; + this.taskRunner = taskRunner; + } + + @Override + public StateModel createNewStateModel(String resourceName, String partition) { + LOG.info("Received request to process partition = " + partition + ", for " + + "resource = " + resourceName + ", at " + instanceName); + return new OnlineOfflineStateModel(); + } + + public class OnlineOfflineStateModel extends StateModel { + public void onBecomeOnlineFromOffline(Message message, NotificationContext context) { + String partitionName = message.getPartitionName(); + LOG.info("Received transition to Online from Offline for partition: " + partitionName); + AGGREGATOR_TYPE type = PARTITION_AGGREGATION_TYPES.get(partitionName); + taskRunner.setPartitionAggregationFunction(type); + } + + public void onBecomeOfflineFromOnline(Message message, NotificationContext context) { + String partitionName = message.getPartitionName(); + LOG.info("Received transition to Offline from Online for partition: " + partitionName); + AGGREGATOR_TYPE type = PARTITION_AGGREGATION_TYPES.get(partitionName); + taskRunner.unsetPartitionAggregationFunction(type); + } + + public void onBecomeDroppedFromOffline(Message message, NotificationContext context) { + String partitionName = message.getPartitionName(); + LOG.info("Received transition to Dropped from Offline for partition: " + partitionName); + AGGREGATOR_TYPE type = PARTITION_AGGREGATION_TYPES.get(partitionName); + taskRunner.unsetPartitionAggregationFunction(type); + } + } +} \ No newline at end of file
