Revert "AMBARI-15623. Support distributed aggregation for multiple AMS instances. (swagle)"
This reverts commit dfa4454e7d69fb160924a3877d3f3f1a6314c7bd. Project: http://git-wip-us.apache.org/repos/asf/ambari/repo Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/342c510e Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/342c510e Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/342c510e Branch: refs/heads/branch-2.4 Commit: 342c510e5fd0605c4263a53de83510083857ea9c Parents: 75c1981 Author: Aravindan Vijayan <[email protected]> Authored: Wed May 18 14:08:13 2016 -0700 Committer: Aravindan Vijayan <[email protected]> Committed: Thu May 19 13:44:25 2016 -0700 ---------------------------------------------------------------------- .../ambari-metrics-timelineservice/pom.xml | 29 +- .../timeline/HBaseTimelineMetricStore.java | 70 +---- .../timeline/TimelineMetricConfiguration.java | 52 ---- .../metrics/timeline/TimelineMetricStore.java | 6 - .../aggregators/AbstractTimelineAggregator.java | 106 ++----- .../aggregators/TimelineMetricAggregator.java | 26 +- .../TimelineMetricAggregatorFactory.java | 100 +++---- .../TimelineMetricClusterAggregator.java | 9 +- .../TimelineMetricClusterAggregatorSecond.java | 13 +- .../TimelineMetricHostAggregator.java | 11 +- .../v2/TimelineMetricClusterAggregator.java | 11 +- .../v2/TimelineMetricHostAggregator.java | 10 +- .../availability/AggregationTaskRunner.java | 144 ---------- .../availability/CheckpointManager.java | 98 ------- .../OnlineOfflineStateModelFactory.java | 69 ----- .../TimelineMetricHAController.java | 276 ------------------- .../TimelineMetricMetadataManager.java | 4 - .../query/DefaultPhoenixDataSource.java | 2 +- .../webapp/TimelineWebServices.java | 12 - .../timeline/ITPhoenixHBaseAccessor.java | 12 +- .../timeline/TestTimelineMetricStore.java | 5 - .../AbstractTimelineAggregatorTest.java | 7 +- .../aggregators/ITClusterAggregator.java | 20 +- .../aggregators/ITMetricAggregator.java | 19 +- ...melineMetricClusterAggregatorSecondTest.java | 9 +- .../server/upgrade/UpgradeCatalog240.java | 2 +- .../0.1.0/configuration/ams-env.xml | 3 - .../AMBARI_METRICS/0.1.0/metainfo.xml | 2 +- 28 files changed, 143 insertions(+), 984 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ambari/blob/342c510e/ambari-metrics/ambari-metrics-timelineservice/pom.xml ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/pom.xml b/ambari-metrics/ambari-metrics-timelineservice/pom.xml index d962b8b..9fc3679 100644 --- a/ambari-metrics/ambari-metrics-timelineservice/pom.xml +++ b/ambari-metrics/ambari-metrics-timelineservice/pom.xml @@ -34,9 +34,9 @@ <!-- Needed for generating FindBugs warnings using parent pom --> <!--<yarn.basedir>${project.parent.parent.basedir}</yarn.basedir>--> <protobuf.version>2.5.0</protobuf.version> - <hadoop.version>[2.7.1.2.5.0.0,2.7.1.2.5.0.0-9999)</hadoop.version> - <phoenix.version>[4.4.0.2.5.0.0,4.4.0.2.5.0.0-9999)</phoenix.version> - <hbase.version>[1.1.2.2.5.0.0,1.1.2.2.5.0.0-9999)</hbase.version> + <hadoop.version>2.7.1.2.3.4.0-3347</hadoop.version> + <phoenix.version>4.4.0.2.3.4.0-3347</phoenix.version> + <hbase.version>1.1.2.2.3.4.0-3347</hbase.version> </properties> <build> @@ -249,25 +249,6 @@ </build> <dependencies> - - <dependency> - <groupId>org.apache.helix</groupId> - <artifactId>helix-core</artifactId> - <version>0.6.5</version> - <exclusions> - <exclusion> - <artifactId>zookeeper</artifactId> - <groupId>org.apache.zookeeper</groupId> - </exclusion> - </exclusions> - </dependency> - - <dependency> - <artifactId>zookeeper</artifactId> - <groupId>org.apache.zookeeper</groupId> - <version>3.4.8</version> - </dependency> - <dependency> <groupId>org.apache.phoenix</groupId> <artifactId>phoenix-core</artifactId> @@ -584,10 +565,6 @@ <groupId>org.jruby</groupId> <artifactId>jruby-complete</artifactId> </exclusion> - <exclusion> - <artifactId>zookeeper</artifactId> - <groupId>org.apache.zookeeper</groupId> - </exclusion> </exclusions> </dependency> <dependency> http://git-wip-us.apache.org/repos/asf/ambari/blob/342c510e/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/HBaseTimelineMetricStore.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/HBaseTimelineMetricStore.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/HBaseTimelineMetricStore.java index 974f951..83aa36b 100644 --- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/HBaseTimelineMetricStore.java +++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/HBaseTimelineMetricStore.java @@ -32,8 +32,6 @@ import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse; import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.Function; import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.TimelineMetricAggregator; import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.TimelineMetricAggregatorFactory; -import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.AggregationTaskRunner.AGGREGATOR_NAME; -import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.TimelineMetricHAController; import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.discovery.TimelineMetricMetadataKey; import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.discovery.TimelineMetricMetadataManager; import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.Condition; @@ -43,21 +41,17 @@ import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline. import java.io.IOException; import java.sql.SQLException; import java.util.ArrayList; -import java.util.Collections; import java.util.HashMap; import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Set; -import java.util.TreeMap; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.USE_GROUPBY_AGGREGATOR_QUERIES; import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.DEFAULT_TOPN_HOSTS_LIMIT; -import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.AggregationTaskRunner.ACTUAL_AGGREGATOR_NAMES; public class HBaseTimelineMetricStore extends AbstractService implements TimelineMetricStore { @@ -65,10 +59,8 @@ public class HBaseTimelineMetricStore extends AbstractService implements Timelin private final TimelineMetricConfiguration configuration; private PhoenixHBaseAccessor hBaseAccessor; private static volatile boolean isInitialized = false; - private final ScheduledExecutorService watchdogExecutorService = Executors.newSingleThreadScheduledExecutor(); - private final Map<AGGREGATOR_NAME, ScheduledExecutorService> scheduledExecutors = new HashMap<>(); + private final ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor(); private TimelineMetricMetadataManager metricMetadataManager; - private TimelineMetricHAController haController; private Integer defaultTopNHostsLimit; /** @@ -97,18 +89,6 @@ public class HBaseTimelineMetricStore extends AbstractService implements Timelin metricMetadataManager.initializeMetadata(); // Initialize policies before TTL update hBaseAccessor.initPoliciesAndTTL(); - // Start HA service - if (configuration.isDistributedOperationModeEnabled()) { - // Start the controller - haController = new TimelineMetricHAController(configuration); - try { - haController.initializeHAController(); - } catch (Exception e) { - LOG.error(e); - throw new MetricsSystemInitializationException("Unable to " + - "initialize HA controller", e); - } - } defaultTopNHostsLimit = Integer.parseInt(metricsConf.get(DEFAULT_TOPN_HOSTS_LIMIT, "20")); if (Boolean.parseBoolean(metricsConf.get(USE_GROUPBY_AGGREGATOR_QUERIES, "true"))) { @@ -117,53 +97,46 @@ public class HBaseTimelineMetricStore extends AbstractService implements Timelin // Start the cluster aggregator second TimelineMetricAggregator secondClusterAggregator = - TimelineMetricAggregatorFactory.createTimelineClusterAggregatorSecond( - hBaseAccessor, metricsConf, metricMetadataManager, haController); + TimelineMetricAggregatorFactory.createTimelineClusterAggregatorSecond(hBaseAccessor, metricsConf, metricMetadataManager); scheduleAggregatorThread(secondClusterAggregator); // Start the minute cluster aggregator TimelineMetricAggregator minuteClusterAggregator = - TimelineMetricAggregatorFactory.createTimelineClusterAggregatorMinute( - hBaseAccessor, metricsConf, haController); + TimelineMetricAggregatorFactory.createTimelineClusterAggregatorMinute(hBaseAccessor, metricsConf); scheduleAggregatorThread(minuteClusterAggregator); // Start the hourly cluster aggregator TimelineMetricAggregator hourlyClusterAggregator = - TimelineMetricAggregatorFactory.createTimelineClusterAggregatorHourly( - hBaseAccessor, metricsConf, haController); + TimelineMetricAggregatorFactory.createTimelineClusterAggregatorHourly(hBaseAccessor, metricsConf); scheduleAggregatorThread(hourlyClusterAggregator); // Start the daily cluster aggregator TimelineMetricAggregator dailyClusterAggregator = - TimelineMetricAggregatorFactory.createTimelineClusterAggregatorDaily( - hBaseAccessor, metricsConf, haController); + TimelineMetricAggregatorFactory.createTimelineClusterAggregatorDaily(hBaseAccessor, metricsConf); scheduleAggregatorThread(dailyClusterAggregator); // Start the minute host aggregator TimelineMetricAggregator minuteHostAggregator = - TimelineMetricAggregatorFactory.createTimelineMetricAggregatorMinute( - hBaseAccessor, metricsConf, haController); + TimelineMetricAggregatorFactory.createTimelineMetricAggregatorMinute(hBaseAccessor, metricsConf); scheduleAggregatorThread(minuteHostAggregator); // Start the hourly host aggregator TimelineMetricAggregator hourlyHostAggregator = - TimelineMetricAggregatorFactory.createTimelineMetricAggregatorHourly( - hBaseAccessor, metricsConf, haController); + TimelineMetricAggregatorFactory.createTimelineMetricAggregatorHourly(hBaseAccessor, metricsConf); scheduleAggregatorThread(hourlyHostAggregator); // Start the daily host aggregator TimelineMetricAggregator dailyHostAggregator = - TimelineMetricAggregatorFactory.createTimelineMetricAggregatorDaily( - hBaseAccessor, metricsConf, haController); + TimelineMetricAggregatorFactory.createTimelineMetricAggregatorDaily(hBaseAccessor, metricsConf); scheduleAggregatorThread(dailyHostAggregator); if (!configuration.isTimelineMetricsServiceWatcherDisabled()) { int initDelay = configuration.getTimelineMetricsServiceWatcherInitDelay(); int delay = configuration.getTimelineMetricsServiceWatcherDelay(); // Start the watchdog - watchdogExecutorService.scheduleWithFixedDelay( - new TimelineMetricStoreWatcher(this, configuration), - initDelay, delay, TimeUnit.SECONDS); + executorService.scheduleWithFixedDelay( + new TimelineMetricStoreWatcher(this, configuration), initDelay, delay, + TimeUnit.SECONDS); LOG.info("Started watchdog for timeline metrics store with initial " + "delay = " + initDelay + ", delay = " + delay); } @@ -349,30 +322,13 @@ public class HBaseTimelineMetricStore extends AbstractService implements Timelin return metricMetadataManager.getHostedAppsCache(); } - @Override - public List<String> getLiveInstances() { - return haController.getLiveInstanceHostNames(); - } - - private void scheduleAggregatorThread(final TimelineMetricAggregator aggregator) { + private void scheduleAggregatorThread(TimelineMetricAggregator aggregator) { + ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor(); if (!aggregator.isDisabled()) { - ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor( - new ThreadFactory() { - @Override - public Thread newThread(Runnable r) { - return new Thread(r, ACTUAL_AGGREGATOR_NAMES.get(aggregator.getName())); - } - } - ); - scheduledExecutors.put(aggregator.getName(), executorService); executorService.scheduleAtFixedRate(aggregator, 0l, aggregator.getSleepIntervalMillis(), TimeUnit.MILLISECONDS); - LOG.info("Scheduled aggregator thread " + aggregator.getName() + " every " + - + aggregator.getSleepIntervalMillis() + " milliseconds."); - } else { - LOG.info("Skipped scheduling " + aggregator.getName() + " since it is disabled."); } } } http://git-wip-us.apache.org/repos/asf/ambari/blob/342c510e/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricConfiguration.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricConfiguration.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricConfiguration.java index 5a04ad2..8e9a8d6 100644 --- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricConfiguration.java +++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricConfiguration.java @@ -23,11 +23,9 @@ import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.conf.Configuration; -import java.net.InetAddress; import java.net.MalformedURLException; import java.net.URISyntaxException; import java.net.URL; -import java.net.UnknownHostException; /** * Configuration class that reads properties from ams-site.xml. All values @@ -40,7 +38,6 @@ public class TimelineMetricConfiguration { public static final String HBASE_SITE_CONFIGURATION_FILE = "hbase-site.xml"; public static final String METRICS_SITE_CONFIGURATION_FILE = "ams-site.xml"; - public static final String METRICS_ENV_CONFIGURATION_FILE = "ams-env.xml"; public static final String TIMELINE_METRICS_AGGREGATOR_CHECKPOINT_DIR = "timeline.metrics.aggregator.checkpoint.dir"; @@ -228,25 +225,16 @@ public class TimelineMetricConfiguration { public static final String TIMELINE_METRICS_TABLES_DURABILITY = "timeline.metrics.tables.durability"; -<<<<<<< HEAD - public static final String TIMELINE_METRIC_METADATA_FILTERS = - "timeline.metrics.service.metadata.filters"; - public static final String HBASE_BLOCKING_STORE_FILES = "hbase.hstore.blockingStoreFiles"; public static final String DEFAULT_TOPN_HOSTS_LIMIT = "timeline.metrics.default.topn.hosts.limit"; -======= ->>>>>>> parent of e3c9816... AMBARI-15902. Refactor Metadata manager for supporting distributed collector. (swagle) public static final String HOST_APP_ID = "HOST"; - public static final String DEFAULT_INSTANCE_PORT = "12001"; - private Configuration hbaseConf; private Configuration metricsConf; - private Configuration amsEnvConf; private volatile boolean isInitialized = false; public void initialize() throws URISyntaxException, MalformedURLException { @@ -273,7 +261,6 @@ public class TimelineMetricConfiguration { hbaseConf.addResource(hbaseResUrl.toURI().toURL()); metricsConf = new Configuration(true); metricsConf.addResource(amsResUrl.toURI().toURL()); - isInitialized = true; } @@ -291,37 +278,6 @@ public class TimelineMetricConfiguration { return metricsConf; } - public String getZKClientPort() throws MalformedURLException, URISyntaxException { - if (!isInitialized) { - initialize(); - } - return hbaseConf.getTrimmed("hbase.zookeeper.property.clientPort", "2181"); - } - - public String getZKQuorum() throws MalformedURLException, URISyntaxException { - if (!isInitialized) { - initialize(); - } - return hbaseConf.getTrimmed("hbase.zookeeper.quorum"); - } - - public String getInstanceHostnameFromEnv() throws UnknownHostException { - String amsInstanceName = System.getProperty("AMS_INSTANCE_NAME"); - if (amsInstanceName == null) { - amsInstanceName = InetAddress.getLocalHost().getHostName(); - } - return amsInstanceName; - } - - public String getInstancePort() throws MalformedURLException, URISyntaxException { - String amsInstancePort = System.getProperty("AMS_INSTANCE_PORT"); - if (amsInstancePort == null) { - // Check config - return getMetricsConf().get("timeline.metrics.availability.instance.port", DEFAULT_INSTANCE_PORT); - } - return DEFAULT_INSTANCE_PORT; - } - public String getWebappAddress() { String defaultHttpAddress = "0.0.0.0:6188"; if (metricsConf != null) { @@ -379,12 +335,4 @@ public class TimelineMetricConfiguration { } return defaultRpcAddress; } - - public boolean isDistributedOperationModeEnabled() { - try { - return getMetricsConf().get("timeline.metrics.service.operation.mode").equals("distributed"); - } catch (Exception e) { - return false; - } - } } http://git-wip-us.apache.org/repos/asf/ambari/blob/342c510e/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricStore.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricStore.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricStore.java index e37bc4d..3e70330 100644 --- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricStore.java +++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricStore.java @@ -85,10 +85,4 @@ public interface TimelineMetricStore { * @throws IOException */ Map<String, Set<String>> getHostAppsMetadata() throws SQLException, IOException; - - /** - * Return a list of known live collector nodes - * @return [ hostname ] - */ - List<String> getLiveInstances(); } http://git-wip-us.apache.org/repos/asf/ambari/blob/342c510e/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/AbstractTimelineAggregator.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/AbstractTimelineAggregator.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/AbstractTimelineAggregator.java index ae87cf1..ba7807b 100644 --- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/AbstractTimelineAggregator.java +++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/AbstractTimelineAggregator.java @@ -20,9 +20,6 @@ package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline import org.apache.commons.io.FileUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixHBaseAccessor; -import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.AggregationTaskRunner; -import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.AggregationTaskRunner.AGGREGATOR_NAME; -import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.TimelineMetricHAController; import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.Condition; import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL; import org.slf4j.LoggerFactory; @@ -37,7 +34,6 @@ import java.util.Date; import static java.util.concurrent.TimeUnit.SECONDS; import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.AGGREGATOR_CHECKPOINT_DELAY; import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.RESULTSET_FETCH_SIZE; -import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.AggregationTaskRunner.ACTUAL_AGGREGATOR_NAMES; /** * Base class for all runnable aggregators. Provides common functions like @@ -56,12 +52,11 @@ public abstract class AbstractTimelineAggregator implements TimelineMetricAggreg protected String tableName; protected String outputTableName; protected Long nativeTimeRangeDelay; - protected AggregationTaskRunner taskRunner; // Explicitly name aggregators for logging needs - private final AGGREGATOR_NAME aggregatorName; + private final String aggregatorName; - AbstractTimelineAggregator(AGGREGATOR_NAME aggregatorName, + AbstractTimelineAggregator(String aggregatorName, PhoenixHBaseAccessor hBaseAccessor, Configuration metricsConf) { this.aggregatorName = aggregatorName; @@ -69,10 +64,10 @@ public abstract class AbstractTimelineAggregator implements TimelineMetricAggreg this.metricsConf = metricsConf; this.checkpointDelayMillis = SECONDS.toMillis(metricsConf.getInt(AGGREGATOR_CHECKPOINT_DELAY, 120)); this.resultsetFetchSize = metricsConf.getInt(RESULTSET_FETCH_SIZE, 2000); - this.LOG = LoggerFactory.getLogger(ACTUAL_AGGREGATOR_NAMES.get(aggregatorName)); + this.LOG = LoggerFactory.getLogger(aggregatorName); } - public AbstractTimelineAggregator(AGGREGATOR_NAME aggregatorName, + public AbstractTimelineAggregator(String aggregatorName, PhoenixHBaseAccessor hBaseAccessor, Configuration metricsConf, String checkpointLocation, @@ -81,8 +76,7 @@ public abstract class AbstractTimelineAggregator implements TimelineMetricAggreg String aggregatorDisableParam, String tableName, String outputTableName, - Long nativeTimeRangeDelay, - TimelineMetricHAController haController) { + Long nativeTimeRangeDelay) { this(aggregatorName, hBaseAccessor, metricsConf); this.checkpointLocation = checkpointLocation; this.sleepIntervalMillis = sleepIntervalMillis; @@ -90,9 +84,7 @@ public abstract class AbstractTimelineAggregator implements TimelineMetricAggreg this.aggregatorDisableParam = aggregatorDisableParam; this.tableName = tableName; this.outputTableName = outputTableName; - this.nativeTimeRangeDelay = nativeTimeRangeDelay; - this.taskRunner = haController != null && haController.isInitialized() ? - haController.getAggregationTaskRunner() : null; + this.nativeTimeRangeDelay = nativeTimeRangeDelay; } @Override @@ -106,39 +98,25 @@ public abstract class AbstractTimelineAggregator implements TimelineMetricAggreg * Access relaxed for tests */ public void runOnce(Long SLEEP_INTERVAL) { - boolean performAggregationFunction = true; - if (taskRunner != null) { - switch (getAggregatorType()) { - case HOST: - performAggregationFunction = taskRunner.performsHostAggregation(); - break; - case CLUSTER: - performAggregationFunction = taskRunner.performsClusterAggregation(); - } - } - if (performAggregationFunction) { - long currentTime = System.currentTimeMillis(); - long lastCheckPointTime = readLastCheckpointSavingOnFirstRun(currentTime); + long currentTime = System.currentTimeMillis(); + long lastCheckPointTime = readLastCheckpointSavingOnFirstRun(currentTime); - if (lastCheckPointTime != -1) { - LOG.info("Last check point time: " + lastCheckPointTime + ", lagBy: " - + ((currentTime - lastCheckPointTime) / 1000) - + " seconds."); - - boolean success = doWork(lastCheckPointTime, lastCheckPointTime + SLEEP_INTERVAL); - - if (success) { - try { - saveCheckPoint(lastCheckPointTime + SLEEP_INTERVAL); - } catch (IOException io) { - LOG.warn("Error saving checkpoint, restarting aggregation at " + - "previous checkpoint."); - } + if (lastCheckPointTime != -1) { + LOG.info("Last check point time: " + lastCheckPointTime + ", lagBy: " + + ((currentTime - lastCheckPointTime) / 1000) + + " seconds."); + + boolean success = doWork(lastCheckPointTime, lastCheckPointTime + SLEEP_INTERVAL); + + if (success) { + try { + saveCheckPoint(lastCheckPointTime + SLEEP_INTERVAL); + } catch (IOException io) { + LOG.warn("Error saving checkpoint, restarting aggregation at " + + "previous checkpoint."); } } - } else { - LOG.info("Skipping aggregation function not owned by this instance."); } } @@ -196,9 +174,6 @@ public abstract class AbstractTimelineAggregator implements TimelineMetricAggreg } protected long readCheckPoint() { - if (taskRunner != null) { - return taskRunner.getCheckpointManager().readCheckpoint(aggregatorName); - } try { File checkpoint = new File(getCheckpointLocation()); if (checkpoint.exists()) { @@ -214,23 +189,15 @@ public abstract class AbstractTimelineAggregator implements TimelineMetricAggreg } protected void saveCheckPoint(long checkpointTime) throws IOException { - if (taskRunner != null) { - boolean success = taskRunner.getCheckpointManager().writeCheckpoint(aggregatorName, checkpointTime); - if (!success) { - LOG.error("Error saving checkpoint with AggregationTaskRunner, " + - "aggregator = " + aggregatorName + "value = " + checkpointTime); + File checkpoint = new File(getCheckpointLocation()); + if (!checkpoint.exists()) { + boolean done = checkpoint.createNewFile(); + if (!done) { + throw new IOException("Could not create checkpoint at location, " + + getCheckpointLocation()); } - } else { - File checkpoint = new File(getCheckpointLocation()); - if (!checkpoint.exists()) { - boolean done = checkpoint.createNewFile(); - if (!done) { - throw new IOException("Could not create checkpoint at location, " + - getCheckpointLocation()); - } - } - FileUtils.writeStringToFile(checkpoint, String.valueOf(checkpointTime)); } + FileUtils.writeStringToFile(checkpoint, String.valueOf(checkpointTime)); } /** @@ -350,21 +317,4 @@ public abstract class AbstractTimelineAggregator implements TimelineMetricAggreg return currentTime - (currentTime % aggregatorPeriod); } - /** - * Get @AGGREGATOR_TYPE based on the output table. - * This is solely used by the HAController to determine which lock to acquire. - */ - public AGGREGATOR_TYPE getAggregatorType() { - if (outputTableName.contains("RECORD")) { - return AGGREGATOR_TYPE.HOST; - } else if (outputTableName.contains("AGGREGATE")) { - return AGGREGATOR_TYPE.CLUSTER; - } - return null; - } - - @Override - public AGGREGATOR_NAME getName() { - return aggregatorName; - } } http://git-wip-us.apache.org/repos/asf/ambari/blob/342c510e/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricAggregator.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricAggregator.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricAggregator.java index 150e3f1..295db0e 100644 --- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricAggregator.java +++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricAggregator.java @@ -1,7 +1,5 @@ package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators; -import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.AggregationTaskRunner.AGGREGATOR_NAME; - /** * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file @@ -22,38 +20,22 @@ import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline. public interface TimelineMetricAggregator extends Runnable { /** * Aggregate metric data within the time bounds. - * * @param startTime start time millis - * @param endTime end time millis + * @param endTime end time millis * @return success */ - boolean doWork(long startTime, long endTime); + public boolean doWork(long startTime, long endTime); /** * Is aggregator is disabled by configuration. - * * @return true/false */ - boolean isDisabled(); + public boolean isDisabled(); /** * Return aggregator Interval - * * @return Interval in Millis */ - Long getSleepIntervalMillis(); - - /** - * Get aggregator name - * @return @AGGREGATOR_NAME - */ - AGGREGATOR_NAME getName(); + public Long getSleepIntervalMillis(); - /** - * Known aggregator types - */ - enum AGGREGATOR_TYPE { - CLUSTER, - HOST } -} http://git-wip-us.apache.org/repos/asf/ambari/blob/342c510e/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricAggregatorFactory.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricAggregatorFactory.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricAggregatorFactory.java index 4c44f9e..cc85c56 100644 --- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricAggregatorFactory.java +++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricAggregatorFactory.java @@ -20,7 +20,6 @@ package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline import org.apache.commons.io.FilenameUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixHBaseAccessor; -import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.TimelineMetricHAController; import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.discovery.TimelineMetricMetadataManager; import static java.util.concurrent.TimeUnit.SECONDS; @@ -30,12 +29,12 @@ import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.ti import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.CLUSTER_AGGREGATOR_HOUR_CHECKPOINT_CUTOFF_MULTIPLIER; import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.CLUSTER_AGGREGATOR_HOUR_DISABLED; import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.CLUSTER_AGGREGATOR_HOUR_SLEEP_INTERVAL; -import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.CLUSTER_AGGREGATOR_MINUTE_CHECKPOINT_CUTOFF_MULTIPLIER; -import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.CLUSTER_AGGREGATOR_MINUTE_DISABLED; -import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.CLUSTER_AGGREGATOR_MINUTE_SLEEP_INTERVAL; import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.CLUSTER_AGGREGATOR_SECOND_CHECKPOINT_CUTOFF_MULTIPLIER; +import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.CLUSTER_AGGREGATOR_MINUTE_CHECKPOINT_CUTOFF_MULTIPLIER; import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.CLUSTER_AGGREGATOR_SECOND_DISABLED; +import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.CLUSTER_AGGREGATOR_MINUTE_DISABLED; import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.CLUSTER_AGGREGATOR_SECOND_SLEEP_INTERVAL; +import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.CLUSTER_AGGREGATOR_MINUTE_SLEEP_INTERVAL; import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.CLUSTER_AGGREGATOR_TIMESLICE_INTERVAL; import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.DEFAULT_CHECKPOINT_LOCATION; import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.HOST_AGGREGATOR_DAILY_CHECKPOINT_CUTOFF_MULTIPLIER; @@ -49,13 +48,6 @@ import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.ti import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.HOST_AGGREGATOR_MINUTE_SLEEP_INTERVAL; import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.TIMELINE_METRICS_AGGREGATOR_CHECKPOINT_DIR; import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.USE_GROUPBY_AGGREGATOR_QUERIES; -import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.AggregationTaskRunner.AGGREGATOR_NAME.METRIC_AGGREGATE_DAILY; -import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.AggregationTaskRunner.AGGREGATOR_NAME.METRIC_AGGREGATE_HOURLY; -import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.AggregationTaskRunner.AGGREGATOR_NAME.METRIC_AGGREGATE_MINUTE; -import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.AggregationTaskRunner.AGGREGATOR_NAME.METRIC_AGGREGATE_SECOND; -import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.AggregationTaskRunner.AGGREGATOR_NAME.METRIC_RECORD_DAILY; -import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.AggregationTaskRunner.AGGREGATOR_NAME.METRIC_RECORD_HOURLY; -import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.AggregationTaskRunner.AGGREGATOR_NAME.METRIC_RECORD_MINUTE; import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.METRICS_AGGREGATE_DAILY_TABLE_NAME; import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.METRICS_AGGREGATE_HOURLY_TABLE_NAME; import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.METRICS_AGGREGATE_MINUTE_TABLE_NAME; @@ -94,8 +86,7 @@ public class TimelineMetricAggregatorFactory { * Interval : 5 mins */ public static TimelineMetricAggregator createTimelineMetricAggregatorMinute - (PhoenixHBaseAccessor hBaseAccessor, Configuration metricsConf, - TimelineMetricHAController haController) { + (PhoenixHBaseAccessor hBaseAccessor, Configuration metricsConf) { String checkpointDir = metricsConf.get( TIMELINE_METRICS_AGGREGATOR_CHECKPOINT_DIR, DEFAULT_CHECKPOINT_LOCATION); @@ -113,7 +104,7 @@ public class TimelineMetricAggregatorFactory { if (useGroupByAggregator(metricsConf)) { return new org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.v2.TimelineMetricHostAggregator( - METRIC_RECORD_MINUTE, + "TimelineMetricHostAggregatorMinute", hBaseAccessor, metricsConf, checkpointLocation, sleepIntervalMillis, @@ -121,13 +112,12 @@ public class TimelineMetricAggregatorFactory { hostAggregatorDisabledParam, inputTableName, outputTableName, - 120000l, - haController + 120000l ); } return new TimelineMetricHostAggregator( - METRIC_RECORD_MINUTE, + "TimelineMetricHostAggregatorMinute", hBaseAccessor, metricsConf, checkpointLocation, sleepIntervalMillis, @@ -135,8 +125,7 @@ public class TimelineMetricAggregatorFactory { hostAggregatorDisabledParam, inputTableName, outputTableName, - 120000l, - haController); + 120000l); } /** @@ -144,8 +133,7 @@ public class TimelineMetricAggregatorFactory { * Interval : 1 hour */ public static TimelineMetricAggregator createTimelineMetricAggregatorHourly - (PhoenixHBaseAccessor hBaseAccessor, Configuration metricsConf, - TimelineMetricHAController haController) { + (PhoenixHBaseAccessor hBaseAccessor, Configuration metricsConf) { String checkpointDir = metricsConf.get( TIMELINE_METRICS_AGGREGATOR_CHECKPOINT_DIR, DEFAULT_CHECKPOINT_LOCATION); @@ -163,7 +151,7 @@ public class TimelineMetricAggregatorFactory { if (useGroupByAggregator(metricsConf)) { return new org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.v2.TimelineMetricHostAggregator( - METRIC_RECORD_HOURLY, + "TimelineMetricHostAggregatorHourly", hBaseAccessor, metricsConf, checkpointLocation, sleepIntervalMillis, @@ -171,13 +159,12 @@ public class TimelineMetricAggregatorFactory { hostAggregatorDisabledParam, inputTableName, outputTableName, - 3600000l, - haController + 3600000l ); } return new TimelineMetricHostAggregator( - METRIC_RECORD_HOURLY, + "TimelineMetricHostAggregatorHourly", hBaseAccessor, metricsConf, checkpointLocation, sleepIntervalMillis, @@ -185,8 +172,7 @@ public class TimelineMetricAggregatorFactory { hostAggregatorDisabledParam, inputTableName, outputTableName, - 3600000l, - haController); + 3600000l); } /** @@ -194,8 +180,7 @@ public class TimelineMetricAggregatorFactory { * Interval : 1 day */ public static TimelineMetricAggregator createTimelineMetricAggregatorDaily - (PhoenixHBaseAccessor hBaseAccessor, Configuration metricsConf, - TimelineMetricHAController haController) { + (PhoenixHBaseAccessor hBaseAccessor, Configuration metricsConf) { String checkpointDir = metricsConf.get( TIMELINE_METRICS_AGGREGATOR_CHECKPOINT_DIR, DEFAULT_CHECKPOINT_LOCATION); @@ -213,7 +198,7 @@ public class TimelineMetricAggregatorFactory { if (useGroupByAggregator(metricsConf)) { return new org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.v2.TimelineMetricHostAggregator( - METRIC_RECORD_DAILY, + "TimelineMetricHostAggregatorDaily", hBaseAccessor, metricsConf, checkpointLocation, sleepIntervalMillis, @@ -221,13 +206,12 @@ public class TimelineMetricAggregatorFactory { hostAggregatorDisabledParam, inputTableName, outputTableName, - 3600000l, - haController + 3600000l ); } return new TimelineMetricHostAggregator( - METRIC_RECORD_DAILY, + "TimelineMetricHostAggregatorDaily", hBaseAccessor, metricsConf, checkpointLocation, sleepIntervalMillis, @@ -235,8 +219,7 @@ public class TimelineMetricAggregatorFactory { hostAggregatorDisabledParam, inputTableName, outputTableName, - 3600000l, - haController); + 3600000l); } /** @@ -246,8 +229,7 @@ public class TimelineMetricAggregatorFactory { */ public static TimelineMetricAggregator createTimelineClusterAggregatorSecond( PhoenixHBaseAccessor hBaseAccessor, Configuration metricsConf, - TimelineMetricMetadataManager metadataManager, - TimelineMetricHAController haController) { + TimelineMetricMetadataManager metadataManager) { String checkpointDir = metricsConf.get( TIMELINE_METRICS_AGGREGATOR_CHECKPOINT_DIR, DEFAULT_CHECKPOINT_LOCATION); @@ -270,7 +252,7 @@ public class TimelineMetricAggregatorFactory { // Second based aggregation have added responsibility of time slicing return new TimelineMetricClusterAggregatorSecond( - METRIC_AGGREGATE_SECOND, + "TimelineClusterAggregatorSecond", metadataManager, hBaseAccessor, metricsConf, checkpointLocation, @@ -280,8 +262,7 @@ public class TimelineMetricAggregatorFactory { inputTableName, outputTableName, 120000l, - timeSliceIntervalMillis, - haController + timeSliceIntervalMillis ); } @@ -290,8 +271,7 @@ public class TimelineMetricAggregatorFactory { * Interval : 5 mins */ public static TimelineMetricAggregator createTimelineClusterAggregatorMinute( - PhoenixHBaseAccessor hBaseAccessor, Configuration metricsConf, - TimelineMetricHAController haController) { + PhoenixHBaseAccessor hBaseAccessor, Configuration metricsConf) { String checkpointDir = metricsConf.get( TIMELINE_METRICS_AGGREGATOR_CHECKPOINT_DIR, DEFAULT_CHECKPOINT_LOCATION); @@ -311,7 +291,7 @@ public class TimelineMetricAggregatorFactory { if (useGroupByAggregator(metricsConf)) { return new org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.v2.TimelineMetricClusterAggregator( - METRIC_AGGREGATE_MINUTE, + "TimelineClusterAggregatorMinute", hBaseAccessor, metricsConf, checkpointLocation, sleepIntervalMillis, @@ -319,13 +299,12 @@ public class TimelineMetricAggregatorFactory { aggregatorDisabledParam, inputTableName, outputTableName, - 120000l, - haController + 120000l ); } return new TimelineMetricClusterAggregator( - METRIC_AGGREGATE_MINUTE, + "TimelineClusterAggregatorMinute", hBaseAccessor, metricsConf, checkpointLocation, sleepIntervalMillis, @@ -333,8 +312,7 @@ public class TimelineMetricAggregatorFactory { aggregatorDisabledParam, inputTableName, outputTableName, - 120000l, - haController + 120000l ); } @@ -343,8 +321,7 @@ public class TimelineMetricAggregatorFactory { * Interval : 1 hour */ public static TimelineMetricAggregator createTimelineClusterAggregatorHourly( - PhoenixHBaseAccessor hBaseAccessor, Configuration metricsConf, - TimelineMetricHAController haController) { + PhoenixHBaseAccessor hBaseAccessor, Configuration metricsConf) { String checkpointDir = metricsConf.get( TIMELINE_METRICS_AGGREGATOR_CHECKPOINT_DIR, DEFAULT_CHECKPOINT_LOCATION); @@ -364,7 +341,7 @@ public class TimelineMetricAggregatorFactory { if (useGroupByAggregator(metricsConf)) { return new org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.v2.TimelineMetricClusterAggregator( - METRIC_AGGREGATE_HOURLY, + "TimelineClusterAggregatorHourly", hBaseAccessor, metricsConf, checkpointLocation, sleepIntervalMillis, @@ -372,13 +349,12 @@ public class TimelineMetricAggregatorFactory { aggregatorDisabledParam, inputTableName, outputTableName, - 120000l, - haController + 120000l ); } return new TimelineMetricClusterAggregator( - METRIC_AGGREGATE_HOURLY, + "TimelineClusterAggregatorHourly", hBaseAccessor, metricsConf, checkpointLocation, sleepIntervalMillis, @@ -386,8 +362,7 @@ public class TimelineMetricAggregatorFactory { aggregatorDisabledParam, inputTableName, outputTableName, - 120000l, - haController + 120000l ); } @@ -396,8 +371,7 @@ public class TimelineMetricAggregatorFactory { * Interval : 1 day */ public static TimelineMetricAggregator createTimelineClusterAggregatorDaily( - PhoenixHBaseAccessor hBaseAccessor, Configuration metricsConf, - TimelineMetricHAController haController) { + PhoenixHBaseAccessor hBaseAccessor, Configuration metricsConf) { String checkpointDir = metricsConf.get( TIMELINE_METRICS_AGGREGATOR_CHECKPOINT_DIR, DEFAULT_CHECKPOINT_LOCATION); @@ -417,7 +391,7 @@ public class TimelineMetricAggregatorFactory { if (useGroupByAggregator(metricsConf)) { return new org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.v2.TimelineMetricClusterAggregator( - METRIC_AGGREGATE_DAILY, + "TimelineClusterAggregatorDaily", hBaseAccessor, metricsConf, checkpointLocation, sleepIntervalMillis, @@ -425,13 +399,12 @@ public class TimelineMetricAggregatorFactory { aggregatorDisabledParam, inputTableName, outputTableName, - 120000l, - haController + 120000l ); } return new TimelineMetricClusterAggregator( - METRIC_AGGREGATE_DAILY, + "TimelineClusterAggregatorDaily", hBaseAccessor, metricsConf, checkpointLocation, sleepIntervalMillis, @@ -439,8 +412,7 @@ public class TimelineMetricAggregatorFactory { aggregatorDisabledParam, inputTableName, outputTableName, - 120000l, - haController + 120000l ); } } http://git-wip-us.apache.org/repos/asf/ambari/blob/342c510e/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricClusterAggregator.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricClusterAggregator.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricClusterAggregator.java index 6438256..f90b01f 100644 --- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricClusterAggregator.java +++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricClusterAggregator.java @@ -19,8 +19,6 @@ package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixHBaseAccessor; -import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.AggregationTaskRunner.AGGREGATOR_NAME; -import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.TimelineMetricHAController; import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.Condition; import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.DefaultCondition; @@ -38,7 +36,7 @@ public class TimelineMetricClusterAggregator extends AbstractTimelineAggregator private final TimelineMetricReadHelper readHelper = new TimelineMetricReadHelper(true); private final boolean isClusterPrecisionInputTable; - public TimelineMetricClusterAggregator(AGGREGATOR_NAME aggregatorName, + public TimelineMetricClusterAggregator(String aggregatorName, PhoenixHBaseAccessor hBaseAccessor, Configuration metricsConf, String checkpointLocation, @@ -47,12 +45,11 @@ public class TimelineMetricClusterAggregator extends AbstractTimelineAggregator String hostAggregatorDisabledParam, String inputTableName, String outputTableName, - Long nativeTimeRangeDelay, - TimelineMetricHAController haController) { + Long nativeTimeRangeDelay) { super(aggregatorName, hBaseAccessor, metricsConf, checkpointLocation, sleepIntervalMillis, checkpointCutOffMultiplier, hostAggregatorDisabledParam, inputTableName, outputTableName, - nativeTimeRangeDelay, haController); + nativeTimeRangeDelay); isClusterPrecisionInputTable = inputTableName.equals(METRICS_CLUSTER_AGGREGATE_TABLE_NAME); } http://git-wip-us.apache.org/repos/asf/ambari/blob/342c510e/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricClusterAggregatorSecond.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricClusterAggregatorSecond.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricClusterAggregatorSecond.java index 6a16ee6..a8d3086 100644 --- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricClusterAggregatorSecond.java +++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricClusterAggregatorSecond.java @@ -22,12 +22,10 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.metrics2.sink.timeline.PostProcessingUtil; import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric; import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixHBaseAccessor; -import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.AggregationTaskRunner.AGGREGATOR_NAME; -import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.TimelineMetricHAController; import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.discovery.TimelineMetricMetadataManager; import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.Condition; import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.DefaultCondition; - +import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL; import java.io.IOException; import java.sql.ResultSet; import java.sql.SQLException; @@ -41,6 +39,7 @@ import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.ti import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.TIMELINE_METRICS_CLUSTER_AGGREGATOR_INTERPOLATION_ENABLED; import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.GET_METRIC_SQL; import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.METRICS_RECORD_TABLE_NAME; +import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.NATIVE_TIME_RANGE_DELTA; /** * Aggregates a metric across all hosts in the cluster. Reads metrics from @@ -55,7 +54,8 @@ public class TimelineMetricClusterAggregatorSecond extends AbstractTimelineAggre private final Long serverTimeShiftAdjustment; private final boolean interpolationEnabled; - public TimelineMetricClusterAggregatorSecond(AGGREGATOR_NAME aggregatorName, + + public TimelineMetricClusterAggregatorSecond(String aggregatorName, TimelineMetricMetadataManager metadataManager, PhoenixHBaseAccessor hBaseAccessor, Configuration metricsConf, @@ -66,11 +66,10 @@ public class TimelineMetricClusterAggregatorSecond extends AbstractTimelineAggre String tableName, String outputTableName, Long nativeTimeRangeDelay, - Long timeSliceInterval, - TimelineMetricHAController haController) { + Long timeSliceInterval) { super(aggregatorName, hBaseAccessor, metricsConf, checkpointLocation, sleepIntervalMillis, checkpointCutOffMultiplier, aggregatorDisabledParam, - tableName, outputTableName, nativeTimeRangeDelay, haController); + tableName, outputTableName, nativeTimeRangeDelay); appAggregator = new TimelineMetricAppAggregator(metadataManager, metricsConf); this.timeSliceIntervalMillis = timeSliceInterval; http://git-wip-us.apache.org/repos/asf/ambari/blob/342c510e/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricHostAggregator.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricHostAggregator.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricHostAggregator.java index 364a4b5..26e73b0 100644 --- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricHostAggregator.java +++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricHostAggregator.java @@ -22,24 +22,20 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric; import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixHBaseAccessor; -import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.AggregationTaskRunner.AGGREGATOR_NAME; -import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.TimelineMetricHAController; import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.Condition; import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.DefaultCondition; - import java.io.IOException; import java.sql.ResultSet; import java.sql.SQLException; import java.util.HashMap; import java.util.Map; - import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.GET_METRIC_AGGREGATE_ONLY_SQL; public class TimelineMetricHostAggregator extends AbstractTimelineAggregator { private static final Log LOG = LogFactory.getLog(TimelineMetricHostAggregator.class); TimelineMetricReadHelper readHelper = new TimelineMetricReadHelper(false); - public TimelineMetricHostAggregator(AGGREGATOR_NAME aggregatorName, + public TimelineMetricHostAggregator(String aggregatorName, PhoenixHBaseAccessor hBaseAccessor, Configuration metricsConf, String checkpointLocation, @@ -48,11 +44,10 @@ public class TimelineMetricHostAggregator extends AbstractTimelineAggregator { String hostAggregatorDisabledParam, String tableName, String outputTableName, - Long nativeTimeRangeDelay, - TimelineMetricHAController haController) { + Long nativeTimeRangeDelay) { super(aggregatorName, hBaseAccessor, metricsConf, checkpointLocation, sleepIntervalMillis, checkpointCutOffMultiplier, hostAggregatorDisabledParam, - tableName, outputTableName, nativeTimeRangeDelay, haController); + tableName, outputTableName, nativeTimeRangeDelay); } @Override http://git-wip-us.apache.org/repos/asf/ambari/blob/342c510e/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/v2/TimelineMetricClusterAggregator.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/v2/TimelineMetricClusterAggregator.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/v2/TimelineMetricClusterAggregator.java index aeddf06..c056d79 100644 --- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/v2/TimelineMetricClusterAggregator.java +++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/v2/TimelineMetricClusterAggregator.java @@ -20,23 +20,19 @@ package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixHBaseAccessor; import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.AbstractTimelineAggregator; -import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.AggregationTaskRunner.AGGREGATOR_NAME; -import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.TimelineMetricHAController; import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.Condition; import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.EmptyCondition; - import java.io.IOException; import java.sql.ResultSet; import java.sql.SQLException; import java.util.Date; - import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.GET_AGGREGATED_APP_METRIC_GROUPBY_SQL; import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.METRICS_CLUSTER_AGGREGATE_TABLE_NAME; public class TimelineMetricClusterAggregator extends AbstractTimelineAggregator { private final String aggregateColumnName; - public TimelineMetricClusterAggregator(AGGREGATOR_NAME aggregatorName, + public TimelineMetricClusterAggregator(String aggregatorName, PhoenixHBaseAccessor hBaseAccessor, Configuration metricsConf, String checkpointLocation, @@ -45,12 +41,11 @@ public class TimelineMetricClusterAggregator extends AbstractTimelineAggregator String hostAggregatorDisabledParam, String inputTableName, String outputTableName, - Long nativeTimeRangeDelay, - TimelineMetricHAController haController) { + Long nativeTimeRangeDelay) { super(aggregatorName, hBaseAccessor, metricsConf, checkpointLocation, sleepIntervalMillis, checkpointCutOffMultiplier, hostAggregatorDisabledParam, inputTableName, outputTableName, - nativeTimeRangeDelay, haController); + nativeTimeRangeDelay); if (inputTableName.equals(METRICS_CLUSTER_AGGREGATE_TABLE_NAME)) { aggregateColumnName = "HOSTS_COUNT"; http://git-wip-us.apache.org/repos/asf/ambari/blob/342c510e/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/v2/TimelineMetricHostAggregator.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/v2/TimelineMetricHostAggregator.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/v2/TimelineMetricHostAggregator.java index 0df8329..118c695 100644 --- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/v2/TimelineMetricHostAggregator.java +++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/v2/TimelineMetricHostAggregator.java @@ -20,11 +20,8 @@ package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixHBaseAccessor; import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.AbstractTimelineAggregator; -import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.AggregationTaskRunner.AGGREGATOR_NAME; -import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.TimelineMetricHAController; import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.Condition; import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.EmptyCondition; - import java.io.IOException; import java.sql.ResultSet; import java.sql.SQLException; @@ -34,7 +31,7 @@ import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.ti public class TimelineMetricHostAggregator extends AbstractTimelineAggregator { - public TimelineMetricHostAggregator(AGGREGATOR_NAME aggregatorName, + public TimelineMetricHostAggregator(String aggregatorName, PhoenixHBaseAccessor hBaseAccessor, Configuration metricsConf, String checkpointLocation, @@ -43,11 +40,10 @@ public class TimelineMetricHostAggregator extends AbstractTimelineAggregator { String hostAggregatorDisabledParam, String tableName, String outputTableName, - Long nativeTimeRangeDelay, - TimelineMetricHAController haController) { + Long nativeTimeRangeDelay) { super(aggregatorName, hBaseAccessor, metricsConf, checkpointLocation, sleepIntervalMillis, checkpointCutOffMultiplier, hostAggregatorDisabledParam, - tableName, outputTableName, nativeTimeRangeDelay, haController); + tableName, outputTableName, nativeTimeRangeDelay); } @Override http://git-wip-us.apache.org/repos/asf/ambari/blob/342c510e/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/availability/AggregationTaskRunner.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/availability/AggregationTaskRunner.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/availability/AggregationTaskRunner.java deleted file mode 100644 index 4a1f17b..0000000 --- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/availability/AggregationTaskRunner.java +++ /dev/null @@ -1,144 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * <p/> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p/> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.TimelineMetricAggregator.AGGREGATOR_TYPE; -import org.apache.helix.HelixManager; -import org.apache.helix.HelixManagerFactory; -import org.apache.helix.InstanceType; -import org.apache.helix.participant.StateMachineEngine; - -import java.util.HashMap; -import java.util.Map; -import java.util.concurrent.atomic.AtomicBoolean; - -import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.TimelineMetricAggregator.AGGREGATOR_TYPE.CLUSTER; -import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.TimelineMetricAggregator.AGGREGATOR_TYPE.HOST; -import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.AggregationTaskRunner.AGGREGATOR_NAME.METRIC_AGGREGATE_DAILY; -import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.AggregationTaskRunner.AGGREGATOR_NAME.METRIC_AGGREGATE_HOURLY; -import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.AggregationTaskRunner.AGGREGATOR_NAME.METRIC_AGGREGATE_MINUTE; -import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.AggregationTaskRunner.AGGREGATOR_NAME.METRIC_AGGREGATE_SECOND; -import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.AggregationTaskRunner.AGGREGATOR_NAME.METRIC_RECORD_DAILY; -import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.AggregationTaskRunner.AGGREGATOR_NAME.METRIC_RECORD_HOURLY; -import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.AggregationTaskRunner.AGGREGATOR_NAME.METRIC_RECORD_MINUTE; -import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.TimelineMetricHAController.CLUSTER_NAME; -import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.TimelineMetricHAController.METRIC_AGGREGATORS; -import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.TimelineMetricHAController.STATE_MODEL_NAME; - -public class AggregationTaskRunner { - private final String instanceName; - private final String zkAddress; - private HelixManager manager; - private static final Log LOG = LogFactory.getLog(AggregationTaskRunner.class); - private CheckpointManager checkpointManager; - // Map partition name to an aggregator dimension - static final Map<String, AGGREGATOR_TYPE> PARTITION_AGGREGATION_TYPES = new HashMap<>(); - // Ownership flags to be set by the State transitions - private final AtomicBoolean performsClusterAggregation = new AtomicBoolean(false); - private final AtomicBoolean performsHostAggregation = new AtomicBoolean(false); - - public enum AGGREGATOR_NAME { - METRIC_RECORD_MINUTE, - METRIC_RECORD_HOURLY, - METRIC_RECORD_DAILY, - METRIC_AGGREGATE_SECOND, - METRIC_AGGREGATE_MINUTE, - METRIC_AGGREGATE_HOURLY, - METRIC_AGGREGATE_DAILY, - } - - public static final Map<AGGREGATOR_NAME, String> ACTUAL_AGGREGATOR_NAMES = new HashMap<>(); - - static { - ACTUAL_AGGREGATOR_NAMES.put(METRIC_RECORD_MINUTE, "TimelineMetricHostAggregatorMinute"); - ACTUAL_AGGREGATOR_NAMES.put(METRIC_RECORD_HOURLY, "TimelineMetricHostAggregatorHourly"); - ACTUAL_AGGREGATOR_NAMES.put(METRIC_RECORD_DAILY, "TimelineMetricHostAggregatorDaily"); - ACTUAL_AGGREGATOR_NAMES.put(METRIC_AGGREGATE_SECOND, "TimelineClusterAggregatorSecond"); - ACTUAL_AGGREGATOR_NAMES.put(METRIC_AGGREGATE_MINUTE, "TimelineClusterAggregatorMinute"); - ACTUAL_AGGREGATOR_NAMES.put(METRIC_AGGREGATE_HOURLY, "TimelineClusterAggregatorHourly"); - ACTUAL_AGGREGATOR_NAMES.put(METRIC_AGGREGATE_DAILY, "TimelineClusterAggregatorDaily"); - - // Partition name to task assignment - PARTITION_AGGREGATION_TYPES.put(METRIC_AGGREGATORS + "_0", CLUSTER); - PARTITION_AGGREGATION_TYPES.put(METRIC_AGGREGATORS + "_1", HOST); - } - - public AggregationTaskRunner(String instanceName, String zkAddress) { - this.instanceName = instanceName; - this.zkAddress = zkAddress; - } - - public void initialize() throws Exception { - manager = HelixManagerFactory.getZKHelixManager(CLUSTER_NAME, instanceName, - InstanceType.PARTICIPANT, zkAddress); - - OnlineOfflineStateModelFactory stateModelFactory = - new OnlineOfflineStateModelFactory(instanceName, this); - - StateMachineEngine stateMach = manager.getStateMachineEngine(); - stateMach.registerStateModelFactory(STATE_MODEL_NAME, stateModelFactory); - manager.connect(); - - checkpointManager = new CheckpointManager(manager.getHelixPropertyStore()); - } - - public boolean performsClusterAggregation() { - return performsClusterAggregation.get(); - } - - public boolean performsHostAggregation() { - return performsHostAggregation.get(); - } - - public CheckpointManager getCheckpointManager() { - return checkpointManager; - } - - public void setPartitionAggregationFunction(AGGREGATOR_TYPE type) { - switch (type) { - case HOST: - performsHostAggregation.set(true); - LOG.info("Set host aggregator function for : " + instanceName); - break; - case CLUSTER: - performsClusterAggregation.set(true); - LOG.info("Set cluster aggregator function for : " + instanceName); - } - } - - public void unsetPartitionAggregationFunction(AGGREGATOR_TYPE type) { - switch (type) { - case HOST: - performsHostAggregation.set(false); - LOG.info("Unset host aggregator function for : " + instanceName); - break; - case CLUSTER: - performsClusterAggregation.set(false); - LOG.info("Unset cluster aggregator function for : " + instanceName); - } - } - - /** - * Disconnect participant before controller shutdown - */ - void stop() { - manager.disconnect(); - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ambari/blob/342c510e/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/availability/CheckpointManager.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/availability/CheckpointManager.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/availability/CheckpointManager.java deleted file mode 100644 index 439102f..0000000 --- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/availability/CheckpointManager.java +++ /dev/null @@ -1,98 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * <p/> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p/> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability; - -import org.I0Itec.zkclient.DataUpdater; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.AggregationTaskRunner.AGGREGATOR_NAME; -import org.apache.helix.AccessOption; -import org.apache.helix.ZNRecord; -import org.apache.helix.store.zk.ZkHelixPropertyStore; -import org.apache.zookeeper.data.Stat; - -import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.AggregationTaskRunner.ACTUAL_AGGREGATOR_NAMES; - -public class CheckpointManager { - private final ZkHelixPropertyStore<ZNRecord> propertyStore; - private static final Log LOG = LogFactory.getLog(CheckpointManager.class); - - static final String ZNODE_FIELD = "checkpoint"; - static final String CHECKPOINT_PATH_PREFIX = "CHECKPOINTS"; - - public CheckpointManager(ZkHelixPropertyStore<ZNRecord> propertyStore) { - this.propertyStore = propertyStore; - } - - /** - * Read aggregator checkpoint from zookeeper - * - * @return timestamp - */ - public long readCheckpoint(AGGREGATOR_NAME aggregatorName) { - String path = getCheckpointZKPath(aggregatorName); - LOG.debug("Reading checkpoint at " + path); - Stat stat = new Stat(); - ZNRecord znRecord = propertyStore.get(path, stat, AccessOption.PERSISTENT); - if (LOG.isTraceEnabled()) { - LOG.trace("Stat => " + stat); - } - long checkpoint = znRecord != null ? znRecord.getLongField(ZNODE_FIELD, -1) : -1; - LOG.debug("Checkpoint value = " + checkpoint); - return checkpoint; - } - - /** - * Write aggregator checkpoint in zookeeper - * - * @param value timestamp - * @return sucsess - */ - public boolean writeCheckpoint(AGGREGATOR_NAME aggregatorName, long value) { - String path = getCheckpointZKPath(aggregatorName); - LOG.debug(String.format("Saving checkpoint at %s with value %s", path, value)); - return propertyStore.update(path, new CheckpointDataUpdater(path, value), AccessOption.PERSISTENT); - } - - static class CheckpointDataUpdater implements DataUpdater<ZNRecord> { - final String path; - final Long value; - - public CheckpointDataUpdater(String path, Long value) { - this.path = path; - this.value = value; - } - - @Override - public ZNRecord update(ZNRecord currentData) { - if (currentData == null) { - currentData = new ZNRecord(path); - } - currentData.setLongField(ZNODE_FIELD, value); - return currentData; - } - } - - String getCheckpointZKPath(AGGREGATOR_NAME aggregatorName) { - StringBuilder sb = new StringBuilder("/"); - sb.append(CHECKPOINT_PATH_PREFIX); - sb.append("/"); - sb.append(ACTUAL_AGGREGATOR_NAMES.get(aggregatorName)); - return sb.toString(); - } -} http://git-wip-us.apache.org/repos/asf/ambari/blob/342c510e/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/availability/OnlineOfflineStateModelFactory.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/availability/OnlineOfflineStateModelFactory.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/availability/OnlineOfflineStateModelFactory.java deleted file mode 100644 index 7d3350b..0000000 --- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/availability/OnlineOfflineStateModelFactory.java +++ /dev/null @@ -1,69 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * <p/> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p/> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.TimelineMetricAggregator.AGGREGATOR_TYPE; -import org.apache.helix.NotificationContext; -import org.apache.helix.model.Message; -import org.apache.helix.participant.statemachine.StateModel; -import org.apache.helix.participant.statemachine.StateModelFactory; - -import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.AggregationTaskRunner.PARTITION_AGGREGATION_TYPES; - -public class OnlineOfflineStateModelFactory extends StateModelFactory<StateModel> { - private static final Log LOG = LogFactory.getLog(OnlineOfflineStateModelFactory.class); - private final String instanceName; - private final AggregationTaskRunner taskRunner; - - public OnlineOfflineStateModelFactory(String instanceName, AggregationTaskRunner taskRunner) { - this.instanceName = instanceName; - this.taskRunner = taskRunner; - } - - @Override - public StateModel createNewStateModel(String resourceName, String partition) { - LOG.info("Received request to process partition = " + partition + ", for " + - "resource = " + resourceName + ", at " + instanceName); - return new OnlineOfflineStateModel(); - } - - public class OnlineOfflineStateModel extends StateModel { - public void onBecomeOnlineFromOffline(Message message, NotificationContext context) { - String partitionName = message.getPartitionName(); - LOG.info("Received transition to Online from Offline for partition: " + partitionName); - AGGREGATOR_TYPE type = PARTITION_AGGREGATION_TYPES.get(partitionName); - taskRunner.setPartitionAggregationFunction(type); - } - - public void onBecomeOfflineFromOnline(Message message, NotificationContext context) { - String partitionName = message.getPartitionName(); - LOG.info("Received transition to Offline from Online for partition: " + partitionName); - AGGREGATOR_TYPE type = PARTITION_AGGREGATION_TYPES.get(partitionName); - taskRunner.unsetPartitionAggregationFunction(type); - } - - public void onBecomeDroppedFromOffline(Message message, NotificationContext context) { - String partitionName = message.getPartitionName(); - LOG.info("Received transition to Dropped from Offline for partition: " + partitionName); - AGGREGATOR_TYPE type = PARTITION_AGGREGATION_TYPES.get(partitionName); - taskRunner.unsetPartitionAggregationFunction(type); - } - } -} \ No newline at end of file
