http://git-wip-us.apache.org/repos/asf/ambari/blob/954e61ee/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/HBaseTimelineMetricStore.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/HBaseTimelineMetricStore.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/HBaseTimelineMetricStore.java index 933bdf0..034fb2f 100644 --- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/HBaseTimelineMetricStore.java +++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/HBaseTimelineMetricStore.java @@ -36,7 +36,7 @@ import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline. import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.TimelineMetricAggregator; import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.TimelineMetricAggregatorFactory; import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.AggregationTaskRunner; -import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.TimelineMetricHAController; +import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.MetricCollectorHAController; import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.discovery.TimelineMetricMetadataKey; import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.discovery.TimelineMetricMetadataManager; import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.function.SeriesAggregateFunction; @@ -45,7 +45,9 @@ import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline. import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.Condition; import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.ConditionBuilder; import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.TopNCondition; + import java.io.IOException; +import java.net.UnknownHostException; import java.sql.SQLException; import java.util.ArrayList; import java.util.Collections; @@ -56,8 +58,9 @@ import java.util.Map; import java.util.Set; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.TimeUnit; import java.util.concurrent.ThreadFactory; +import java.util.concurrent.TimeUnit; + import static java.util.concurrent.TimeUnit.SECONDS; import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.AGGREGATOR_CHECKPOINT_DELAY; import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.DEFAULT_TOPN_HOSTS_LIMIT; @@ -75,7 +78,7 @@ public class HBaseTimelineMetricStore extends AbstractService implements Timelin private final Map<AggregationTaskRunner.AGGREGATOR_NAME, ScheduledExecutorService> scheduledExecutors = new HashMap<>(); private TimelineMetricMetadataManager metricMetadataManager; private Integer defaultTopNHostsLimit; - private TimelineMetricHAController haController; + private MetricCollectorHAController haController; /** * Construct the service. @@ -106,7 +109,7 @@ public class HBaseTimelineMetricStore extends AbstractService implements Timelin // Start HA service if (configuration.isDistributedOperationModeEnabled()) { // Start the controller - haController = new TimelineMetricHAController(configuration); + haController = new MetricCollectorHAController(configuration); try { haController.initializeHAController(); } catch (Exception e) { @@ -384,7 +387,16 @@ public class HBaseTimelineMetricStore extends AbstractService implements Timelin @Override public List<String> getLiveInstances() { - return haController.getLiveInstanceHostNames(); + List<String> instances = haController.getLiveInstanceHostNames(); + if (instances == null || instances.isEmpty()) { + try { + // Always return current host as live (embedded operation mode) + instances = Collections.singletonList(configuration.getInstanceHostnameFromEnv()); + } catch (UnknownHostException e) { + LOG.debug("Exception on getting hostname from env.", e); + } + } + return instances; } private void scheduleAggregatorThread(final TimelineMetricAggregator aggregator,
http://git-wip-us.apache.org/repos/asf/ambari/blob/954e61ee/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/AbstractTimelineAggregator.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/AbstractTimelineAggregator.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/AbstractTimelineAggregator.java index 04f5c1c..de63d4e 100644 --- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/AbstractTimelineAggregator.java +++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/AbstractTimelineAggregator.java @@ -24,7 +24,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixHBaseAccessor; import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.AggregationTaskRunner; import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.AggregationTaskRunner.AGGREGATOR_NAME; -import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.TimelineMetricHAController; +import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.MetricCollectorHAController; import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.Condition; import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.EmptyCondition; import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL; @@ -92,7 +92,7 @@ public abstract class AbstractTimelineAggregator implements TimelineMetricAggreg String tableName, String outputTableName, Long nativeTimeRangeDelay, - TimelineMetricHAController haController) { + MetricCollectorHAController haController) { this(aggregatorName, hBaseAccessor, metricsConf); this.checkpointLocation = checkpointLocation; this.sleepIntervalMillis = sleepIntervalMillis; http://git-wip-us.apache.org/repos/asf/ambari/blob/954e61ee/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricAggregatorFactory.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricAggregatorFactory.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricAggregatorFactory.java index 4c44f9e..2eb3553 100644 --- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricAggregatorFactory.java +++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricAggregatorFactory.java @@ -20,7 +20,7 @@ package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline import org.apache.commons.io.FilenameUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixHBaseAccessor; -import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.TimelineMetricHAController; +import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.MetricCollectorHAController; import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.discovery.TimelineMetricMetadataManager; import static java.util.concurrent.TimeUnit.SECONDS; @@ -95,7 +95,7 @@ public class TimelineMetricAggregatorFactory { */ public static TimelineMetricAggregator createTimelineMetricAggregatorMinute (PhoenixHBaseAccessor hBaseAccessor, Configuration metricsConf, - TimelineMetricHAController haController) { + MetricCollectorHAController haController) { String checkpointDir = metricsConf.get( TIMELINE_METRICS_AGGREGATOR_CHECKPOINT_DIR, DEFAULT_CHECKPOINT_LOCATION); @@ -145,7 +145,7 @@ public class TimelineMetricAggregatorFactory { */ public static TimelineMetricAggregator createTimelineMetricAggregatorHourly (PhoenixHBaseAccessor hBaseAccessor, Configuration metricsConf, - TimelineMetricHAController haController) { + MetricCollectorHAController haController) { String checkpointDir = metricsConf.get( TIMELINE_METRICS_AGGREGATOR_CHECKPOINT_DIR, DEFAULT_CHECKPOINT_LOCATION); @@ -195,7 +195,7 @@ public class TimelineMetricAggregatorFactory { */ public static TimelineMetricAggregator createTimelineMetricAggregatorDaily (PhoenixHBaseAccessor hBaseAccessor, Configuration metricsConf, - TimelineMetricHAController haController) { + MetricCollectorHAController haController) { String checkpointDir = metricsConf.get( TIMELINE_METRICS_AGGREGATOR_CHECKPOINT_DIR, DEFAULT_CHECKPOINT_LOCATION); @@ -247,7 +247,7 @@ public class TimelineMetricAggregatorFactory { public static TimelineMetricAggregator createTimelineClusterAggregatorSecond( PhoenixHBaseAccessor hBaseAccessor, Configuration metricsConf, TimelineMetricMetadataManager metadataManager, - TimelineMetricHAController haController) { + MetricCollectorHAController haController) { String checkpointDir = metricsConf.get( TIMELINE_METRICS_AGGREGATOR_CHECKPOINT_DIR, DEFAULT_CHECKPOINT_LOCATION); @@ -291,7 +291,7 @@ public class TimelineMetricAggregatorFactory { */ public static TimelineMetricAggregator createTimelineClusterAggregatorMinute( PhoenixHBaseAccessor hBaseAccessor, Configuration metricsConf, - TimelineMetricHAController haController) { + MetricCollectorHAController haController) { String checkpointDir = metricsConf.get( TIMELINE_METRICS_AGGREGATOR_CHECKPOINT_DIR, DEFAULT_CHECKPOINT_LOCATION); @@ -344,7 +344,7 @@ public class TimelineMetricAggregatorFactory { */ public static TimelineMetricAggregator createTimelineClusterAggregatorHourly( PhoenixHBaseAccessor hBaseAccessor, Configuration metricsConf, - TimelineMetricHAController haController) { + MetricCollectorHAController haController) { String checkpointDir = metricsConf.get( TIMELINE_METRICS_AGGREGATOR_CHECKPOINT_DIR, DEFAULT_CHECKPOINT_LOCATION); @@ -397,7 +397,7 @@ public class TimelineMetricAggregatorFactory { */ public static TimelineMetricAggregator createTimelineClusterAggregatorDaily( PhoenixHBaseAccessor hBaseAccessor, Configuration metricsConf, - TimelineMetricHAController haController) { + MetricCollectorHAController haController) { String checkpointDir = metricsConf.get( TIMELINE_METRICS_AGGREGATOR_CHECKPOINT_DIR, DEFAULT_CHECKPOINT_LOCATION); http://git-wip-us.apache.org/repos/asf/ambari/blob/954e61ee/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricClusterAggregator.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricClusterAggregator.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricClusterAggregator.java index 6438256..02677b9 100644 --- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricClusterAggregator.java +++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricClusterAggregator.java @@ -20,7 +20,7 @@ package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixHBaseAccessor; import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.AggregationTaskRunner.AGGREGATOR_NAME; -import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.TimelineMetricHAController; +import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.MetricCollectorHAController; import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.Condition; import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.DefaultCondition; @@ -48,7 +48,7 @@ public class TimelineMetricClusterAggregator extends AbstractTimelineAggregator String inputTableName, String outputTableName, Long nativeTimeRangeDelay, - TimelineMetricHAController haController) { + MetricCollectorHAController haController) { super(aggregatorName, hBaseAccessor, metricsConf, checkpointLocation, sleepIntervalMillis, checkpointCutOffMultiplier, hostAggregatorDisabledParam, inputTableName, outputTableName, http://git-wip-us.apache.org/repos/asf/ambari/blob/954e61ee/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricClusterAggregatorSecond.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricClusterAggregatorSecond.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricClusterAggregatorSecond.java index 98b3987..6f3d8bc 100644 --- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricClusterAggregatorSecond.java +++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricClusterAggregatorSecond.java @@ -26,7 +26,7 @@ import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric; import org.apache.hadoop.metrics2.sink.timeline.TimelineMetricMetadata; import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixHBaseAccessor; import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.AggregationTaskRunner; -import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.TimelineMetricHAController; +import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.MetricCollectorHAController; import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.discovery.TimelineMetricMetadataKey; import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.discovery.TimelineMetricMetadataManager; import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.Condition; @@ -76,7 +76,7 @@ public class TimelineMetricClusterAggregatorSecond extends AbstractTimelineAggre String outputTableName, Long nativeTimeRangeDelay, Long timeSliceInterval, - TimelineMetricHAController haController) { + MetricCollectorHAController haController) { super(aggregatorName, hBaseAccessor, metricsConf, checkpointLocation, sleepIntervalMillis, checkpointCutOffMultiplier, aggregatorDisabledParam, tableName, outputTableName, nativeTimeRangeDelay, haController); http://git-wip-us.apache.org/repos/asf/ambari/blob/954e61ee/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricHostAggregator.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricHostAggregator.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricHostAggregator.java index 364a4b5..0ea9c08 100644 --- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricHostAggregator.java +++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricHostAggregator.java @@ -23,7 +23,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric; import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixHBaseAccessor; import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.AggregationTaskRunner.AGGREGATOR_NAME; -import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.TimelineMetricHAController; +import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.MetricCollectorHAController; import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.Condition; import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.DefaultCondition; @@ -49,7 +49,7 @@ public class TimelineMetricHostAggregator extends AbstractTimelineAggregator { String tableName, String outputTableName, Long nativeTimeRangeDelay, - TimelineMetricHAController haController) { + MetricCollectorHAController haController) { super(aggregatorName, hBaseAccessor, metricsConf, checkpointLocation, sleepIntervalMillis, checkpointCutOffMultiplier, hostAggregatorDisabledParam, tableName, outputTableName, nativeTimeRangeDelay, haController); http://git-wip-us.apache.org/repos/asf/ambari/blob/954e61ee/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/v2/TimelineMetricClusterAggregator.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/v2/TimelineMetricClusterAggregator.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/v2/TimelineMetricClusterAggregator.java index bbb9991..c7b605f 100644 --- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/v2/TimelineMetricClusterAggregator.java +++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/v2/TimelineMetricClusterAggregator.java @@ -21,7 +21,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixHBaseAccessor; import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.AbstractTimelineAggregator; import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.AggregationTaskRunner.AGGREGATOR_NAME; -import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.TimelineMetricHAController; +import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.MetricCollectorHAController; import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.Condition; import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.EmptyCondition; @@ -46,7 +46,7 @@ public class TimelineMetricClusterAggregator extends AbstractTimelineAggregator String inputTableName, String outputTableName, Long nativeTimeRangeDelay, - TimelineMetricHAController haController) { + MetricCollectorHAController haController) { super(aggregatorName, hBaseAccessor, metricsConf, checkpointLocation, sleepIntervalMillis, checkpointCutOffMultiplier, hostAggregatorDisabledParam, inputTableName, outputTableName, http://git-wip-us.apache.org/repos/asf/ambari/blob/954e61ee/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/v2/TimelineMetricHostAggregator.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/v2/TimelineMetricHostAggregator.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/v2/TimelineMetricHostAggregator.java index c071708..57a3034 100644 --- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/v2/TimelineMetricHostAggregator.java +++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/v2/TimelineMetricHostAggregator.java @@ -21,7 +21,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixHBaseAccessor; import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.AbstractTimelineAggregator; import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.AggregationTaskRunner.AGGREGATOR_NAME; -import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.TimelineMetricHAController; +import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.MetricCollectorHAController; import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.Condition; import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.EmptyCondition; @@ -44,7 +44,7 @@ public class TimelineMetricHostAggregator extends AbstractTimelineAggregator { String tableName, String outputTableName, Long nativeTimeRangeDelay, - TimelineMetricHAController haController) { + MetricCollectorHAController haController) { super(aggregatorName, hBaseAccessor, metricsConf, checkpointLocation, sleepIntervalMillis, checkpointCutOffMultiplier, hostAggregatorDisabledParam, tableName, outputTableName, nativeTimeRangeDelay, haController); http://git-wip-us.apache.org/repos/asf/ambari/blob/954e61ee/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/availability/AggregationTaskRunner.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/availability/AggregationTaskRunner.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/availability/AggregationTaskRunner.java index fcd26bd..07d4e2e 100644 --- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/availability/AggregationTaskRunner.java +++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/availability/AggregationTaskRunner.java @@ -38,9 +38,9 @@ import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.ti import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.AggregationTaskRunner.AGGREGATOR_NAME.METRIC_RECORD_DAILY; import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.AggregationTaskRunner.AGGREGATOR_NAME.METRIC_RECORD_HOURLY; import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.AggregationTaskRunner.AGGREGATOR_NAME.METRIC_RECORD_MINUTE; -import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.TimelineMetricHAController.CLUSTER_NAME; -import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.TimelineMetricHAController.METRIC_AGGREGATORS; -import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.TimelineMetricHAController.STATE_MODEL_NAME; +import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.MetricCollectorHAController.CLUSTER_NAME; +import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.MetricCollectorHAController.METRIC_AGGREGATORS; +import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.MetricCollectorHAController.STATE_MODEL_NAME; public class AggregationTaskRunner { private final String instanceName; http://git-wip-us.apache.org/repos/asf/ambari/blob/954e61ee/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/availability/MetricCollectorHAController.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/availability/MetricCollectorHAController.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/availability/MetricCollectorHAController.java new file mode 100644 index 0000000..18b9059 --- /dev/null +++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/availability/MetricCollectorHAController.java @@ -0,0 +1,276 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability; + +import com.google.common.base.Joiner; +import org.apache.commons.lang.StringUtils; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.MetricsSystemInitializationException; +import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration; +import org.apache.helix.HelixAdmin; +import org.apache.helix.HelixManager; +import org.apache.helix.HelixManagerFactory; +import org.apache.helix.InstanceType; +import org.apache.helix.NotificationContext; +import org.apache.helix.controller.GenericHelixController; +import org.apache.helix.manager.zk.ZKHelixAdmin; +import org.apache.helix.model.ExternalView; +import org.apache.helix.model.InstanceConfig; +import org.apache.helix.model.LiveInstance; +import org.apache.helix.model.OnlineOfflineSMD; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.TreeSet; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; + +import static org.apache.helix.model.IdealState.RebalanceMode.FULL_AUTO; + +public class MetricCollectorHAController { + private static final Log LOG = LogFactory.getLog(MetricCollectorHAController.class); + + static final String CLUSTER_NAME = "ambari-metrics-cluster"; + static final String METRIC_AGGREGATORS = "METRIC_AGGREGATORS"; + static final String STATE_MODEL_NAME = OnlineOfflineSMD.name; + static final String INSTANCE_NAME_DELIMITER = "_"; + + final String zkConnectUrl; + final String instanceHostname; + final InstanceConfig instanceConfig; + final AggregationTaskRunner aggregationTaskRunner; + + // Cache list of known live instances + final List<String> liveInstanceNames = new ArrayList<>(); + + // Helix Admin + HelixAdmin admin; + // Helix Manager + HelixManager manager; + + private volatile boolean isInitialized = false; + + public MetricCollectorHAController(TimelineMetricConfiguration configuration) { + String instancePort; + try { + instanceHostname = configuration.getInstanceHostnameFromEnv(); + instancePort = configuration.getInstancePort(); + + } catch (Exception e) { + LOG.error("Error reading configs from classpath, will resort to defaults.", e); + throw new MetricsSystemInitializationException(e.getMessage()); + } + + try { + String zkClientPort = configuration.getZKClientPort(); + String zkQuorum = configuration.getZKQuorum(); + + if (StringUtils.isEmpty(zkClientPort) || StringUtils.isEmpty(zkQuorum)) { + throw new Exception("Unable to parse zookeeper quorum. clientPort = " + + zkClientPort +", quorum = " + zkQuorum); + } + + zkConnectUrl = getZkConnectionUrl(zkClientPort, zkQuorum); + + } catch (Exception e) { + LOG.error("Unable to load hbase-site from classpath.", e); + throw new MetricsSystemInitializationException(e.getMessage()); + } + + instanceConfig = new InstanceConfig(instanceHostname + INSTANCE_NAME_DELIMITER + instancePort); + instanceConfig.setHostName(instanceHostname); + instanceConfig.setPort(instancePort); + instanceConfig.setInstanceEnabled(true); + aggregationTaskRunner = new AggregationTaskRunner(instanceConfig.getInstanceName(), zkConnectUrl); + } + + /** + * Initialize the instance with zookeeper via Helix + */ + public void initializeHAController() throws Exception { + admin = new ZKHelixAdmin(zkConnectUrl); + // create cluster + LOG.info("Creating zookeeper cluster node: " + CLUSTER_NAME); + admin.addCluster(CLUSTER_NAME, false); + + // Adding host to the cluster + List<String> nodes = admin.getInstancesInCluster(CLUSTER_NAME); + if (nodes == null || !nodes.contains(instanceConfig.getInstanceName())) { + LOG.info("Adding participant instance " + instanceConfig); + admin.addInstance(CLUSTER_NAME, instanceConfig); + } + + // Add a state model + if (admin.getStateModelDef(CLUSTER_NAME, STATE_MODEL_NAME) == null) { + LOG.info("Adding ONLINE-OFFLINE state model to the cluster"); + admin.addStateModelDef(CLUSTER_NAME, STATE_MODEL_NAME, OnlineOfflineSMD.build()); + } + + // Add resources with 1 cluster-wide replica + // Since our aggregators are unbalanced in terms of work distribution we + // only need to distribute writes to METRIC_AGGREGATE and + // METRIC_RECORD_MINUTE + List<String> resources = admin.getResourcesInCluster(CLUSTER_NAME); + if (!resources.contains(METRIC_AGGREGATORS)) { + LOG.info("Adding resource " + METRIC_AGGREGATORS + " with 2 partitions and 1 replicas"); + admin.addResource(CLUSTER_NAME, METRIC_AGGREGATORS, 2, OnlineOfflineSMD.name, FULL_AUTO.toString()); + } + // this will set up the ideal state, it calculates the preference list for + // each partition similar to consistent hashing + admin.rebalance(CLUSTER_NAME, METRIC_AGGREGATORS, 1); + + // Start participant + startAggregators(); + + // Start controller + startController(); + + Runtime.getRuntime().addShutdownHook(new Thread() { + @Override + public void run() { + aggregationTaskRunner.stop(); + manager.disconnect(); + } + }); + + isInitialized = true; + } + + /** + * Return true if HA controller is enabled. + */ + public boolean isInitialized() { + return isInitialized; + } + + private void startAggregators() { + try { + aggregationTaskRunner.initialize(); + + } catch (Exception e) { + LOG.error("Unable to start aggregators.", e); + throw new MetricsSystemInitializationException(e.getMessage()); + } + } + + private void startController() throws Exception { + manager = HelixManagerFactory.getZKHelixManager( + CLUSTER_NAME, + instanceHostname, + InstanceType.CONTROLLER, + zkConnectUrl + ); + + manager.connect(); + HelixController controller = new HelixController(); + manager.addLiveInstanceChangeListener(controller); + } + + private String getZkConnectionUrl(String zkClientPort, String zkQuorum) { + StringBuilder sb = new StringBuilder(); + String[] quorumParts = zkQuorum.split(","); + String prefix = ""; + for (String part : quorumParts) { + sb.append(prefix); + sb.append(part.trim()); + if (!part.contains(":")) { + sb.append(":"); + sb.append(zkClientPort); + } + prefix = ","; + } + + return sb.toString(); + } + + public AggregationTaskRunner getAggregationTaskRunner() { + return aggregationTaskRunner; + } + + public List<String> getLiveInstanceHostNames() { + List<String> liveInstanceHostNames = new ArrayList<>(); + + for (String instance : liveInstanceNames) { + liveInstanceHostNames.add(instance.split(INSTANCE_NAME_DELIMITER)[0]); + } + + return liveInstanceHostNames; + } + + public class HelixController extends GenericHelixController { + ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor(); + Joiner joiner = Joiner.on(", ").skipNulls(); + + @Override + public void onLiveInstanceChange(List<LiveInstance> liveInstances, NotificationContext changeContext) { + super.onLiveInstanceChange(liveInstances, changeContext); + + liveInstanceNames.clear(); + for (LiveInstance instance : liveInstances) { + liveInstanceNames.add(instance.getInstanceName()); + } + + LOG.info("Detected change in liveliness of Collector instances. " + + "LiveIsntances = " + joiner.join(liveInstanceNames)); + // Print HA state - after some delay + executorService.schedule(new Runnable() { + @Override + public void run() { + printClusterState(); + } + }, 30, TimeUnit.SECONDS); + + + } + } + + public void printClusterState() { + StringBuilder sb = new StringBuilder("\n######################### Cluster HA state ########################"); + + ExternalView resourceExternalView = admin.getResourceExternalView(CLUSTER_NAME, METRIC_AGGREGATORS); + if (resourceExternalView != null) { + getPrintableResourceState(resourceExternalView, METRIC_AGGREGATORS, sb); + } + sb.append("\n##################################################"); + LOG.info(sb.toString()); + } + + private void getPrintableResourceState(ExternalView resourceExternalView, + String resourceName, + StringBuilder sb) { + TreeSet<String> sortedSet = new TreeSet<>(resourceExternalView.getPartitionSet()); + sb.append("\nCLUSTER: "); + sb.append(CLUSTER_NAME); + sb.append("\nRESOURCE: "); + sb.append(resourceName); + for (String partitionName : sortedSet) { + sb.append("\nPARTITION: "); + sb.append(partitionName).append("\t"); + Map<String, String> states = resourceExternalView.getStateMap(partitionName); + for (Map.Entry<String, String> stateEntry : states.entrySet()) { + sb.append("\t"); + sb.append(stateEntry.getKey()); + sb.append("\t"); + sb.append(stateEntry.getValue()); + } + } + } +} http://git-wip-us.apache.org/repos/asf/ambari/blob/954e61ee/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/availability/TimelineMetricHAController.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/availability/TimelineMetricHAController.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/availability/TimelineMetricHAController.java deleted file mode 100644 index 53b9e7e..0000000 --- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/availability/TimelineMetricHAController.java +++ /dev/null @@ -1,276 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * <p/> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p/> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability; - -import com.google.common.base.Joiner; -import org.apache.commons.lang.StringUtils; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.MetricsSystemInitializationException; -import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration; -import org.apache.helix.HelixAdmin; -import org.apache.helix.HelixManager; -import org.apache.helix.HelixManagerFactory; -import org.apache.helix.InstanceType; -import org.apache.helix.NotificationContext; -import org.apache.helix.controller.GenericHelixController; -import org.apache.helix.manager.zk.ZKHelixAdmin; -import org.apache.helix.model.ExternalView; -import org.apache.helix.model.InstanceConfig; -import org.apache.helix.model.LiveInstance; -import org.apache.helix.model.OnlineOfflineSMD; - -import java.util.ArrayList; -import java.util.List; -import java.util.Map; -import java.util.TreeSet; -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.TimeUnit; - -import static org.apache.helix.model.IdealState.RebalanceMode.FULL_AUTO; - -public class TimelineMetricHAController { - private static final Log LOG = LogFactory.getLog(TimelineMetricHAController.class); - - static final String CLUSTER_NAME = "ambari-metrics-cluster"; - static final String METRIC_AGGREGATORS = "METRIC_AGGREGATORS"; - static final String STATE_MODEL_NAME = OnlineOfflineSMD.name; - static final String INSTANCE_NAME_DELIMITER = "_"; - - final String zkConnectUrl; - final String instanceHostname; - final InstanceConfig instanceConfig; - final AggregationTaskRunner aggregationTaskRunner; - - // Cache list of known live instances - final List<String> liveInstanceNames = new ArrayList<>(); - - // Helix Admin - HelixAdmin admin; - // Helix Manager - HelixManager manager; - - private volatile boolean isInitialized = false; - - public TimelineMetricHAController(TimelineMetricConfiguration configuration) { - String instancePort; - try { - instanceHostname = configuration.getInstanceHostnameFromEnv(); - instancePort = configuration.getInstancePort(); - - } catch (Exception e) { - LOG.error("Error reading configs from classpath, will resort to defaults.", e); - throw new MetricsSystemInitializationException(e.getMessage()); - } - - try { - String zkClientPort = configuration.getZKClientPort(); - String zkQuorum = configuration.getZKQuorum(); - - if (StringUtils.isEmpty(zkClientPort) || StringUtils.isEmpty(zkQuorum)) { - throw new Exception("Unable to parse zookeeper quorum. clientPort = " - + zkClientPort +", quorum = " + zkQuorum); - } - - zkConnectUrl = getZkConnectionUrl(zkClientPort, zkQuorum); - - } catch (Exception e) { - LOG.error("Unable to load hbase-site from classpath.", e); - throw new MetricsSystemInitializationException(e.getMessage()); - } - - instanceConfig = new InstanceConfig(instanceHostname + INSTANCE_NAME_DELIMITER + instancePort); - instanceConfig.setHostName(instanceHostname); - instanceConfig.setPort(instancePort); - instanceConfig.setInstanceEnabled(true); - aggregationTaskRunner = new AggregationTaskRunner(instanceConfig.getInstanceName(), zkConnectUrl); - } - - /** - * Initialize the instance with zookeeper via Helix - */ - public void initializeHAController() throws Exception { - admin = new ZKHelixAdmin(zkConnectUrl); - // create cluster - LOG.info("Creating zookeeper cluster node: " + CLUSTER_NAME); - admin.addCluster(CLUSTER_NAME, false); - - // Adding host to the cluster - List<String> nodes = admin.getInstancesInCluster(CLUSTER_NAME); - if (nodes == null || !nodes.contains(instanceConfig.getInstanceName())) { - LOG.info("Adding participant instance " + instanceConfig); - admin.addInstance(CLUSTER_NAME, instanceConfig); - } - - // Add a state model - if (admin.getStateModelDef(CLUSTER_NAME, STATE_MODEL_NAME) == null) { - LOG.info("Adding ONLINE-OFFLINE state model to the cluster"); - admin.addStateModelDef(CLUSTER_NAME, STATE_MODEL_NAME, OnlineOfflineSMD.build()); - } - - // Add resources with 1 cluster-wide replica - // Since our aggregators are unbalanced in terms of work distribution we - // only need to distribute writes to METRIC_AGGREGATE and - // METRIC_RECORD_MINUTE - List<String> resources = admin.getResourcesInCluster(CLUSTER_NAME); - if (!resources.contains(METRIC_AGGREGATORS)) { - LOG.info("Adding resource " + METRIC_AGGREGATORS + " with 2 partitions and 1 replicas"); - admin.addResource(CLUSTER_NAME, METRIC_AGGREGATORS, 2, OnlineOfflineSMD.name, FULL_AUTO.toString()); - } - // this will set up the ideal state, it calculates the preference list for - // each partition similar to consistent hashing - admin.rebalance(CLUSTER_NAME, METRIC_AGGREGATORS, 1); - - // Start participant - startAggregators(); - - // Start controller - startController(); - - Runtime.getRuntime().addShutdownHook(new Thread() { - @Override - public void run() { - aggregationTaskRunner.stop(); - manager.disconnect(); - } - }); - - isInitialized = true; - } - - /** - * Return true if HA controller is enabled. - */ - public boolean isInitialized() { - return isInitialized; - } - - private void startAggregators() { - try { - aggregationTaskRunner.initialize(); - - } catch (Exception e) { - LOG.error("Unable to start aggregators.", e); - throw new MetricsSystemInitializationException(e.getMessage()); - } - } - - private void startController() throws Exception { - manager = HelixManagerFactory.getZKHelixManager( - CLUSTER_NAME, - instanceHostname, - InstanceType.CONTROLLER, - zkConnectUrl - ); - - manager.connect(); - HelixController controller = new HelixController(); - manager.addLiveInstanceChangeListener(controller); - } - - private String getZkConnectionUrl(String zkClientPort, String zkQuorum) { - StringBuilder sb = new StringBuilder(); - String[] quorumParts = zkQuorum.split(","); - String prefix = ""; - for (String part : quorumParts) { - sb.append(prefix); - sb.append(part.trim()); - if (!part.contains(":")) { - sb.append(":"); - sb.append(zkClientPort); - } - prefix = ","; - } - - return sb.toString(); - } - - public AggregationTaskRunner getAggregationTaskRunner() { - return aggregationTaskRunner; - } - - public List<String> getLiveInstanceHostNames() { - List<String> liveInstanceHostNames = new ArrayList<>(); - - for (String instance : liveInstanceNames) { - liveInstanceHostNames.add(instance.split(INSTANCE_NAME_DELIMITER)[0]); - } - - return liveInstanceHostNames; - } - - public class HelixController extends GenericHelixController { - ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor(); - Joiner joiner = Joiner.on(", ").skipNulls(); - - @Override - public void onLiveInstanceChange(List<LiveInstance> liveInstances, NotificationContext changeContext) { - super.onLiveInstanceChange(liveInstances, changeContext); - - liveInstanceNames.clear(); - for (LiveInstance instance : liveInstances) { - liveInstanceNames.add(instance.getInstanceName()); - } - - LOG.info("Detected change in liveliness of Collector instances. " + - "LiveIsntances = " + joiner.join(liveInstanceNames)); - // Print HA state - after some delay - executorService.schedule(new Runnable() { - @Override - public void run() { - printClusterState(); - } - }, 30, TimeUnit.SECONDS); - - - } - } - - public void printClusterState() { - StringBuilder sb = new StringBuilder("\n######################### Cluster HA state ########################"); - - ExternalView resourceExternalView = admin.getResourceExternalView(CLUSTER_NAME, METRIC_AGGREGATORS); - if (resourceExternalView != null) { - getPrintableResourceState(resourceExternalView, METRIC_AGGREGATORS, sb); - } - sb.append("\n##################################################"); - LOG.info(sb.toString()); - } - - private void getPrintableResourceState(ExternalView resourceExternalView, - String resourceName, - StringBuilder sb) { - TreeSet<String> sortedSet = new TreeSet<>(resourceExternalView.getPartitionSet()); - sb.append("\nCLUSTER: "); - sb.append(CLUSTER_NAME); - sb.append("\nRESOURCE: "); - sb.append(resourceName); - for (String partitionName : sortedSet) { - sb.append("\nPARTITION: "); - sb.append(partitionName).append("\t"); - Map<String, String> states = resourceExternalView.getStateMap(partitionName); - for (Map.Entry<String, String> stateEntry : states.entrySet()) { - sb.append("\t"); - sb.append(stateEntry.getKey()); - sb.append("\t"); - sb.append(stateEntry.getValue()); - } - } - } -} http://git-wip-us.apache.org/repos/asf/ambari/blob/954e61ee/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/webapp/TimelineWebServices.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/webapp/TimelineWebServices.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/webapp/TimelineWebServices.java index 4f54284..6b15e29 100644 --- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/webapp/TimelineWebServices.java +++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/webapp/TimelineWebServices.java @@ -411,6 +411,14 @@ public class TimelineWebServices { } } + /** + * This is a discovery endpoint that advertises known live collector + * instances. Note: It will always answer with current instance as live. + * This can be utilized as a liveliness pinger endpoint since the instance + * names are cached and thereby no synchronous calls result from this API + * + * @return List<String> hostnames</String> + */ @GET @Path("/metrics/livenodes") @Produces({ MediaType.APPLICATION_JSON }) http://git-wip-us.apache.org/repos/asf/ambari/blob/954e61ee/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/availability/MetricCollectorHAControllerTest.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/availability/MetricCollectorHAControllerTest.java b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/availability/MetricCollectorHAControllerTest.java new file mode 100644 index 0000000..91ec305 --- /dev/null +++ b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/availability/MetricCollectorHAControllerTest.java @@ -0,0 +1,107 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability; + +import junit.framework.Assert; +import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.AbstractMiniHBaseClusterTest; +import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration; +import org.apache.helix.HelixManager; +import org.apache.helix.HelixManagerFactory; +import org.apache.helix.InstanceType; +import org.apache.helix.model.ExternalView; +import org.apache.helix.model.InstanceConfig; +import org.junit.Before; +import org.junit.Test; + +import java.util.HashMap; +import java.util.Map; + +import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.MetricCollectorHAController.CLUSTER_NAME; +import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.MetricCollectorHAController.METRIC_AGGREGATORS; +import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.MetricCollectorHAController.STATE_MODEL_NAME; +import static org.easymock.EasyMock.createNiceMock; +import static org.easymock.EasyMock.expect; +import static org.easymock.EasyMock.replay; + +public class MetricCollectorHAControllerTest extends AbstractMiniHBaseClusterTest { + TimelineMetricConfiguration configuration; + + @Before + public void setup() throws Exception { + configuration = createNiceMock(TimelineMetricConfiguration.class); + + expect(configuration.getInstanceHostnameFromEnv()).andReturn("h1"); + expect(configuration.getInstancePort()).andReturn("12000"); + // jdbc:phoenix:localhost:52887:/hbase;test=true + String zkUrl = getUrl(); + String port = zkUrl.split(":")[3]; + String quorum = zkUrl.split(":")[2]; + + expect(configuration.getZKClientPort()).andReturn(port); + expect(configuration.getZKQuorum()).andReturn(quorum); + + replay(configuration); + } + + @Test(timeout = 180000) + public void testHAControllerDistributedAggregation() throws Exception { + MetricCollectorHAController haController = new MetricCollectorHAController(configuration); + haController.initializeHAController(); + // Wait for task assignment + Thread.sleep(10000); + + Assert.assertTrue(haController.isInitialized()); + Assert.assertEquals(1, haController.getLiveInstanceHostNames().size()); + Assert.assertTrue(haController.getAggregationTaskRunner().performsClusterAggregation()); + Assert.assertTrue(haController.getAggregationTaskRunner().performsHostAggregation()); + + // Add new instance + InstanceConfig instanceConfig2 = new InstanceConfig("h2_12001"); + haController.admin.addInstance(CLUSTER_NAME, instanceConfig2); + HelixManager manager2 = HelixManagerFactory.getZKHelixManager(CLUSTER_NAME, + instanceConfig2.getInstanceName(), + InstanceType.PARTICIPANT, haController.zkConnectUrl); + manager2.getStateMachineEngine().registerStateModelFactory(STATE_MODEL_NAME, + new OnlineOfflineStateModelFactory(instanceConfig2.getInstanceName(), + new AggregationTaskRunner(instanceConfig2.getInstanceName(), ""))); + manager2.connect(); + haController.admin.rebalance(CLUSTER_NAME, METRIC_AGGREGATORS, 1); + + // Wait on re-assignment of partitions + Thread.sleep(10000); + Assert.assertEquals(2, haController.getLiveInstanceHostNames().size()); + + ExternalView view = haController.admin.getResourceExternalView(CLUSTER_NAME, METRIC_AGGREGATORS); + + Map<String, String> partitionInstanceMap = new HashMap<>(); + + for (String partition : view.getPartitionSet()) { + Map<String, String> states = view.getStateMap(partition); + // (instance, state) pairs + for (Map.Entry<String, String> stateEntry : states.entrySet()) { + partitionInstanceMap.put(partition, stateEntry.getKey()); + Assert.assertEquals("ONLINE", stateEntry.getValue()); + } + } + // Re-assigned partitions + Assert.assertEquals(2, partitionInstanceMap.size()); + + haController.getAggregationTaskRunner().stop(); + haController.manager.disconnect(); + } +} http://git-wip-us.apache.org/repos/asf/ambari/blob/954e61ee/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/availability/TimelineMetricHAControllerTest.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/availability/TimelineMetricHAControllerTest.java b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/availability/TimelineMetricHAControllerTest.java deleted file mode 100644 index 04e8909..0000000 --- a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/availability/TimelineMetricHAControllerTest.java +++ /dev/null @@ -1,107 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * <p/> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p/> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability; - -import junit.framework.Assert; -import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.AbstractMiniHBaseClusterTest; -import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration; -import org.apache.helix.HelixManager; -import org.apache.helix.HelixManagerFactory; -import org.apache.helix.InstanceType; -import org.apache.helix.model.ExternalView; -import org.apache.helix.model.InstanceConfig; -import org.junit.Before; -import org.junit.Test; - -import java.util.HashMap; -import java.util.Map; - -import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.TimelineMetricHAController.CLUSTER_NAME; -import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.TimelineMetricHAController.METRIC_AGGREGATORS; -import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.TimelineMetricHAController.STATE_MODEL_NAME; -import static org.easymock.EasyMock.createNiceMock; -import static org.easymock.EasyMock.expect; -import static org.easymock.EasyMock.replay; - -public class TimelineMetricHAControllerTest extends AbstractMiniHBaseClusterTest { - TimelineMetricConfiguration configuration; - - @Before - public void setup() throws Exception { - configuration = createNiceMock(TimelineMetricConfiguration.class); - - expect(configuration.getInstanceHostnameFromEnv()).andReturn("h1"); - expect(configuration.getInstancePort()).andReturn("12000"); - // jdbc:phoenix:localhost:52887:/hbase;test=true - String zkUrl = getUrl(); - String port = zkUrl.split(":")[3]; - String quorum = zkUrl.split(":")[2]; - - expect(configuration.getZKClientPort()).andReturn(port); - expect(configuration.getZKQuorum()).andReturn(quorum); - - replay(configuration); - } - - @Test(timeout = 150000) - public void testHAControllerDistributedAggregation() throws Exception { - TimelineMetricHAController haController = new TimelineMetricHAController(configuration); - haController.initializeHAController(); - // Wait for task assignment - Thread.sleep(10000); - - Assert.assertTrue(haController.isInitialized()); - Assert.assertEquals(1, haController.getLiveInstanceHostNames().size()); - Assert.assertTrue(haController.getAggregationTaskRunner().performsClusterAggregation()); - Assert.assertTrue(haController.getAggregationTaskRunner().performsHostAggregation()); - - // Add new instance - InstanceConfig instanceConfig2 = new InstanceConfig("h2_12001"); - haController.admin.addInstance(CLUSTER_NAME, instanceConfig2); - HelixManager manager2 = HelixManagerFactory.getZKHelixManager(CLUSTER_NAME, - instanceConfig2.getInstanceName(), - InstanceType.PARTICIPANT, haController.zkConnectUrl); - manager2.getStateMachineEngine().registerStateModelFactory(STATE_MODEL_NAME, - new OnlineOfflineStateModelFactory(instanceConfig2.getInstanceName(), - new AggregationTaskRunner(instanceConfig2.getInstanceName(), ""))); - manager2.connect(); - haController.admin.rebalance(CLUSTER_NAME, METRIC_AGGREGATORS, 1); - - // Wait on re-assignment of partitions - Thread.sleep(10000); - Assert.assertEquals(2, haController.getLiveInstanceHostNames().size()); - - ExternalView view = haController.admin.getResourceExternalView(CLUSTER_NAME, METRIC_AGGREGATORS); - - Map<String, String> partitionInstanceMap = new HashMap<>(); - - for (String partition : view.getPartitionSet()) { - Map<String, String> states = view.getStateMap(partition); - // (instance, state) pairs - for (Map.Entry<String, String> stateEntry : states.entrySet()) { - partitionInstanceMap.put(partition, stateEntry.getKey()); - Assert.assertEquals("ONLINE", stateEntry.getValue()); - } - } - // Re-assigned partitions - Assert.assertEquals(2, partitionInstanceMap.size()); - - haController.getAggregationTaskRunner().stop(); - haController.manager.disconnect(); - } -} http://git-wip-us.apache.org/repos/asf/ambari/blob/954e61ee/ambari-server/src/main/resources/common-services/FLUME/1.4.0.2.0/package/scripts/params.py ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/resources/common-services/FLUME/1.4.0.2.0/package/scripts/params.py b/ambari-server/src/main/resources/common-services/FLUME/1.4.0.2.0/package/scripts/params.py index be50524..3203a52 100644 --- a/ambari-server/src/main/resources/common-services/FLUME/1.4.0.2.0/package/scripts/params.py +++ b/ambari-server/src/main/resources/common-services/FLUME/1.4.0.2.0/package/scripts/params.py @@ -86,6 +86,7 @@ if config.has_key('hostname'): ams_collector_hosts = default("/clusterHostInfo/metrics_collector_hosts", []) has_metric_collector = not len(ams_collector_hosts) == 0 +metric_collector_port = None if has_metric_collector: if 'cluster-env' in config['configurations'] and \ 'metrics_collector_vip_host' in config['configurations']['cluster-env']: @@ -111,3 +112,22 @@ if has_metric_collector: pass metrics_report_interval = default("/configurations/ams-site/timeline.metrics.sink.report.interval", 60) metrics_collection_period = default("/configurations/ams-site/timeline.metrics.sink.collection.period", 10) + +#Collector hosts +metric_collector_hosts = None +if ams_collector_hosts: + for host in ams_collector_hosts: + metric_collector_hosts += host + ':' + metric_collector_port + ',' + metric_collector_hosts = metric_collector_hosts[:-1] + +# Cluster Zookeeper quorum +zookeeper_quorum = None +if not len(default("/clusterHostInfo/zookeeper_hosts", [])) == 0: + if 'zoo.cfg' in config['configurations'] and 'clientPort' in config['configurations']['zoo.cfg']: + zookeeper_clientPort = config['configurations']['zoo.cfg']['clientPort'] + else: + zookeeper_clientPort = '2181' + zookeeper_quorum = (':' + zookeeper_clientPort + ',').join(config['clusterHostInfo']['zookeeper_hosts']) + # last port config + zookeeper_quorum += ':' + zookeeper_clientPort + http://git-wip-us.apache.org/repos/asf/ambari/blob/954e61ee/ambari-server/src/main/resources/common-services/FLUME/1.4.0.2.0/package/templates/flume-metrics2.properties.j2 ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/resources/common-services/FLUME/1.4.0.2.0/package/templates/flume-metrics2.properties.j2 b/ambari-server/src/main/resources/common-services/FLUME/1.4.0.2.0/package/templates/flume-metrics2.properties.j2 index df68242..b960296 100644 --- a/ambari-server/src/main/resources/common-services/FLUME/1.4.0.2.0/package/templates/flume-metrics2.properties.j2 +++ b/ambari-server/src/main/resources/common-services/FLUME/1.4.0.2.0/package/templates/flume-metrics2.properties.j2 @@ -16,7 +16,10 @@ # limitations under the License. #} -collector={{metric_collector_protocol}}://{{metric_collector_host}}:{{metric_collector_port}} +collector={{metric_collector_hosts}} +protocol={{metric_collector_protocol}} +zookeeper.quorum={{zookeeper_quorum}} +port={{metric_collector_port}} collectionFrequency={{metrics_collection_period}}000 maxRowCacheSize=10000 sendInterval={{metrics_report_interval}}000 http://git-wip-us.apache.org/repos/asf/ambari/blob/954e61ee/ambari-server/src/main/resources/common-services/HBASE/0.96.0.2.0/package/templates/hadoop-metrics2-hbase.properties-GANGLIA-MASTER.j2 ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/resources/common-services/HBASE/0.96.0.2.0/package/templates/hadoop-metrics2-hbase.properties-GANGLIA-MASTER.j2 b/ambari-server/src/main/resources/common-services/HBASE/0.96.0.2.0/package/templates/hadoop-metrics2-hbase.properties-GANGLIA-MASTER.j2 index 7763bdd..e62ce9e 100644 --- a/ambari-server/src/main/resources/common-services/HBASE/0.96.0.2.0/package/templates/hadoop-metrics2-hbase.properties-GANGLIA-MASTER.j2 +++ b/ambari-server/src/main/resources/common-services/HBASE/0.96.0.2.0/package/templates/hadoop-metrics2-hbase.properties-GANGLIA-MASTER.j2 @@ -51,6 +51,7 @@ hbase.extendedperiod = 3600 *.timeline.plugin.urls=file:///usr/lib/ambari-metrics-hadoop-sink/ambari-metrics-hadoop-sink.jar *.sink.timeline.slave.host.name={{hostname}} + hbase.class=org.apache.hadoop.metrics2.sink.timeline.HadoopTimelineMetricsSink hbase.period={{metrics_collection_period}} hbase.collector={{metric_collector_host}}:{{metric_collector_port}} @@ -66,7 +67,8 @@ rpc.collector={{metric_collector_host}}:{{metric_collector_port}} hbase.sink.timeline.class=org.apache.hadoop.metrics2.sink.timeline.HadoopTimelineMetricsSink hbase.sink.timeline.period={{metrics_collection_period}} hbase.sink.timeline.sendInterval={{metrics_report_interval}}000 -hbase.sink.timeline.collector={{metric_collector_protocol}}://{{metric_collector_host}}:{{metric_collector_port}} +hbase.sink.timeline.collector={{metric_collector_hosts}} +hbase.sink.timeline.protocol={{metric_collector_protocol}} # HTTPS properties hbase.sink.timeline.truststore.path = {{metric_truststore_path}} http://git-wip-us.apache.org/repos/asf/ambari/blob/954e61ee/ambari-server/src/main/resources/common-services/STORM/0.9.1/package/scripts/params_linux.py ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/resources/common-services/STORM/0.9.1/package/scripts/params_linux.py b/ambari-server/src/main/resources/common-services/STORM/0.9.1/package/scripts/params_linux.py index 1ee4422..17f9e2c 100644 --- a/ambari-server/src/main/resources/common-services/STORM/0.9.1/package/scripts/params_linux.py +++ b/ambari-server/src/main/resources/common-services/STORM/0.9.1/package/scripts/params_linux.py @@ -170,6 +170,7 @@ if stack_supports_storm_kerberos: ams_collector_hosts = default("/clusterHostInfo/metrics_collector_hosts", []) has_metric_collector = not len(ams_collector_hosts) == 0 +metric_collector_port = None if has_metric_collector: if 'cluster-env' in config['configurations'] and \ 'metrics_collector_vip_host' in config['configurations']['cluster-env']: @@ -201,6 +202,20 @@ metrics_collection_period = default("/configurations/ams-site/timeline.metrics.s metric_collector_sink_jar = "/usr/lib/storm/lib/ambari-metrics-storm-sink-with-common-*.jar" metric_collector_legacy_sink_jar = "/usr/lib/storm/lib/ambari-metrics-storm-sink-legacy-with-common-*.jar" +# Collector hosts +metric_collector_hosts = None +if ams_collector_hosts: + for host in ams_collector_hosts: + metric_collector_hosts += host + ':' + metric_collector_port + ',' + metric_collector_hosts = metric_collector_hosts[:-1] + +# Cluster Zookeeper quorum +zookeeper_quorum = None +if storm_zookeeper_servers: + for server in storm_zookeeper_servers: + zookeeper_quorum += server + ':' + storm_zookeeper_port + "," + zookeeper_quorum = zookeeper_quorum[:-1] + jar_jvm_opts = '' ######################################################## http://git-wip-us.apache.org/repos/asf/ambari/blob/954e61ee/ambari-server/src/main/resources/common-services/STORM/0.9.1/package/templates/storm-metrics2.properties.j2 ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/resources/common-services/STORM/0.9.1/package/templates/storm-metrics2.properties.j2 b/ambari-server/src/main/resources/common-services/STORM/0.9.1/package/templates/storm-metrics2.properties.j2 index ebeb887..1f0875f 100644 --- a/ambari-server/src/main/resources/common-services/STORM/0.9.1/package/templates/storm-metrics2.properties.j2 +++ b/ambari-server/src/main/resources/common-services/STORM/0.9.1/package/templates/storm-metrics2.properties.j2 @@ -16,7 +16,10 @@ # limitations under the License. #} -collector={{metric_collector_protocol}}://{{metric_collector_host}}:{{metric_collector_port}} +collector={{metric_collector_hosts}} +protocol={{metric_collector_protocol}} +port={{metric_collector_port}} +zookeeper.quorum={{zookeeper_quorum}} maxRowCacheSize=10000 sendInterval={{metrics_report_interval}}000 clusterReporterAppId=nimbus http://git-wip-us.apache.org/repos/asf/ambari/blob/954e61ee/ambari-server/src/main/resources/stacks/HDP/2.0.6/hooks/before-START/scripts/params.py ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/resources/stacks/HDP/2.0.6/hooks/before-START/scripts/params.py b/ambari-server/src/main/resources/stacks/HDP/2.0.6/hooks/before-START/scripts/params.py index 568d418..cf61539 100644 --- a/ambari-server/src/main/resources/stacks/HDP/2.0.6/hooks/before-START/scripts/params.py +++ b/ambari-server/src/main/resources/stacks/HDP/2.0.6/hooks/before-START/scripts/params.py @@ -109,8 +109,11 @@ is_rmnode_master = hostname in rm_host is_hsnode_master = hostname in hs_host is_hbase_master = hostname in hbase_master_hosts is_slave = hostname in slave_hosts + if has_ganglia_server: ganglia_server_host = ganglia_server_hosts[0] + +metric_collector_port = None if has_metric_collector: if 'cluster-env' in config['configurations'] and \ 'metrics_collector_vip_host' in config['configurations']['cluster-env']: @@ -138,6 +141,24 @@ if has_metric_collector: metrics_report_interval = default("/configurations/ams-site/timeline.metrics.sink.report.interval", 60) metrics_collection_period = default("/configurations/ams-site/timeline.metrics.sink.collection.period", 10) +#Collector hosts +metric_collector_hosts = None +if ams_collector_hosts: + for host in ams_collector_hosts: + metric_collector_hosts += host + ':' + metric_collector_port + ',' + metric_collector_hosts = metric_collector_hosts[:-1] + +# Cluster Zookeeper quorum +zookeeper_quorum = None +if has_zk_host: + if 'zoo.cfg' in config['configurations'] and 'clientPort' in config['configurations']['zoo.cfg']: + zookeeper_clientPort = config['configurations']['zoo.cfg']['clientPort'] + else: + zookeeper_clientPort = '2181' + zookeeper_quorum = (':' + zookeeper_clientPort + ',').join(config['clusterHostInfo']['zookeeper_hosts']) + # last port config + zookeeper_quorum += ':' + zookeeper_clientPort + #hadoop params if has_namenode or dfs_type == 'HCFS': http://git-wip-us.apache.org/repos/asf/ambari/blob/954e61ee/ambari-server/src/main/resources/stacks/HDP/2.0.6/hooks/before-START/templates/hadoop-metrics2.properties.j2 ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/resources/stacks/HDP/2.0.6/hooks/before-START/templates/hadoop-metrics2.properties.j2 b/ambari-server/src/main/resources/stacks/HDP/2.0.6/hooks/before-START/templates/hadoop-metrics2.properties.j2 index ad5c743..075d2c9 100644 --- a/ambari-server/src/main/resources/stacks/HDP/2.0.6/hooks/before-START/templates/hadoop-metrics2.properties.j2 +++ b/ambari-server/src/main/resources/stacks/HDP/2.0.6/hooks/before-START/templates/hadoop-metrics2.properties.j2 @@ -72,19 +72,22 @@ resourcemanager.sink.ganglia.tagsForPrefix.yarn=Queue *.sink.timeline.period={{metrics_collection_period}} *.sink.timeline.sendInterval={{metrics_report_interval}}000 *.sink.timeline.slave.host.name = {{hostname}} +*.sink.timeline.zookeeper.quorum={{zookeeper_quorum}} +*.sink.timeline.protocol={{metric_collector_protocol}} +*.sink.timeline.port={{metric_collector_port}} # HTTPS properties *.sink.timeline.truststore.path = {{metric_truststore_path}} *.sink.timeline.truststore.type = {{metric_truststore_type}} *.sink.timeline.truststore.password = {{metric_truststore_password}} -datanode.sink.timeline.collector={{metric_collector_protocol}}://{{metric_collector_host}}:{{metric_collector_port}} -namenode.sink.timeline.collector={{metric_collector_protocol}}://{{metric_collector_host}}:{{metric_collector_port}} -resourcemanager.sink.timeline.collector={{metric_collector_protocol}}://{{metric_collector_host}}:{{metric_collector_port}} -nodemanager.sink.timeline.collector={{metric_collector_protocol}}://{{metric_collector_host}}:{{metric_collector_port}} -jobhistoryserver.sink.timeline.collector={{metric_collector_protocol}}://{{metric_collector_host}}:{{metric_collector_port}} -journalnode.sink.timeline.collector={{metric_collector_protocol}}://{{metric_collector_host}}:{{metric_collector_port}} -applicationhistoryserver.sink.timeline.collector={{metric_collector_protocol}}://{{metric_collector_host}}:{{metric_collector_port}} +datanode.sink.timeline.collector={{metrics_collector_hosts}} +namenode.sink.timeline.collector={{metrics_collector_hosts}} +resourcemanager.sink.timeline.collector={{metrics_collector_hosts}} +nodemanager.sink.timeline.collector={{metrics_collector_hosts}} +jobhistoryserver.sink.timeline.collector={{metrics_collector_hosts}} +journalnode.sink.timeline.collector={{metrics_collector_hosts}} +applicationhistoryserver.sink.timeline.collector={{metrics_collector_hosts}} resourcemanager.sink.timeline.tagsForPrefix.yarn=Queue
