Repository: ambari Updated Branches: refs/heads/branch-2.0.0 3f9ae9ef0 -> 913c086a4
AMBARI-9970. Metrics are absent for Storm. (mpapyrkovskyy via swagle) Project: http://git-wip-us.apache.org/repos/asf/ambari/repo Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/913c086a Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/913c086a Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/913c086a Branch: refs/heads/branch-2.0.0 Commit: 913c086a4626b0925e1f7d7a104b46b5579d9b29 Parents: 3f9ae9e Author: Siddharth Wagle <[email protected]> Authored: Fri Mar 6 20:01:23 2015 -0800 Committer: Siddharth Wagle <[email protected]> Committed: Fri Mar 6 20:01:23 2015 -0800 ---------------------------------------------------------------------- .../timeline/AbstractTimelineMetricsSink.java | 26 ++++++-------------- .../cache/HandleConnectExceptionTest.java | 4 --- .../sink/flume/FlumeTimelineMetricsSink.java | 13 ---------- .../timeline/HadoopTimelineMetricsSink.java | 7 ------ .../kafka/KafkaTimelineMetricsReporter.java | 15 ----------- .../storm/StormTimelineMetricsReporter.java | 21 +++++----------- .../sink/storm/StormTimelineMetricsSink.java | 17 ------------- .../storm/StormTimelineMetricsSinkTest.java | 1 - 8 files changed, 14 insertions(+), 90 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ambari/blob/913c086a/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/AbstractTimelineMetricsSink.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/AbstractTimelineMetricsSink.java b/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/AbstractTimelineMetricsSink.java index 4f5c6a1..fd4cacd 100644 --- a/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/AbstractTimelineMetricsSink.java +++ b/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/AbstractTimelineMetricsSink.java @@ -19,10 +19,6 @@ package org.apache.hadoop.metrics2.sink.timeline; import java.io.IOException; import java.net.ConnectException; -import java.net.SocketAddress; - -import java.io.IOException; -import java.net.SocketAddress; import org.apache.commons.httpclient.HttpClient; import org.apache.commons.httpclient.methods.PostMethod; @@ -63,19 +59,15 @@ public abstract class AbstractTimelineMetricsSink { try { String jsonData = mapper.writeValueAsString(metrics); - SocketAddress socketAddress = getServerSocketAddress(); + StringRequestEntity requestEntity = new StringRequestEntity(jsonData, "application/json", "UTF-8"); - if (socketAddress != null) { - StringRequestEntity requestEntity = new StringRequestEntity(jsonData, "application/json", "UTF-8"); - - PostMethod postMethod = new PostMethod(connectUrl); - postMethod.setRequestEntity(requestEntity); - int statusCode = httpClient.executeMethod(postMethod); - if (statusCode != 200) { - LOG.info("Unable to POST metrics to collector, " + connectUrl); - } else { - LOG.debug("Metrics posted to Collector " + connectUrl); - } + PostMethod postMethod = new PostMethod(connectUrl); + postMethod.setRequestEntity(requestEntity); + int statusCode = httpClient.executeMethod(postMethod); + if (statusCode != 200) { + LOG.info("Unable to POST metrics to collector, " + connectUrl); + } else { + LOG.debug("Metrics posted to Collector " + connectUrl); } } catch (ConnectException e) { throw new UnableToConnectException(e).setConnectUrl(connectUrl); @@ -86,7 +78,5 @@ public abstract class AbstractTimelineMetricsSink { this.httpClient = httpClient; } - abstract protected SocketAddress getServerSocketAddress(); - abstract protected String getCollectorUri(); } http://git-wip-us.apache.org/repos/asf/ambari/blob/913c086a/ambari-metrics/ambari-metrics-common/src/test/java/org/apache/hadoop/metrics2/sink/timeline/cache/HandleConnectExceptionTest.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-common/src/test/java/org/apache/hadoop/metrics2/sink/timeline/cache/HandleConnectExceptionTest.java b/ambari-metrics/ambari-metrics-common/src/test/java/org/apache/hadoop/metrics2/sink/timeline/cache/HandleConnectExceptionTest.java index 450906a..2786e3c 100644 --- a/ambari-metrics/ambari-metrics-common/src/test/java/org/apache/hadoop/metrics2/sink/timeline/cache/HandleConnectExceptionTest.java +++ b/ambari-metrics/ambari-metrics-common/src/test/java/org/apache/hadoop/metrics2/sink/timeline/cache/HandleConnectExceptionTest.java @@ -65,10 +65,6 @@ public class HandleConnectExceptionTest { } class TestTimelineMetricsSink extends AbstractTimelineMetricsSink{ @Override - protected SocketAddress getServerSocketAddress() { - return new InetSocketAddress("host", 13); - } - @Override protected String getCollectorUri() { return COLLECTOR_URL; } http://git-wip-us.apache.org/repos/asf/ambari/blob/913c086a/ambari-metrics/ambari-metrics-flume-sink/src/main/java/org/apache/hadoop/metrics2/sink/flume/FlumeTimelineMetricsSink.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-flume-sink/src/main/java/org/apache/hadoop/metrics2/sink/flume/FlumeTimelineMetricsSink.java b/ambari-metrics/ambari-metrics-flume-sink/src/main/java/org/apache/hadoop/metrics2/sink/flume/FlumeTimelineMetricsSink.java index 9e66c99..a6137af 100644 --- a/ambari-metrics/ambari-metrics-flume-sink/src/main/java/org/apache/hadoop/metrics2/sink/flume/FlumeTimelineMetricsSink.java +++ b/ambari-metrics/ambari-metrics-flume-sink/src/main/java/org/apache/hadoop/metrics2/sink/flume/FlumeTimelineMetricsSink.java @@ -30,11 +30,9 @@ import org.apache.hadoop.metrics2.sink.timeline.AbstractTimelineMetricsSink; import org.apache.hadoop.metrics2.sink.timeline.UnableToConnectException; import org.apache.hadoop.metrics2.sink.timeline.cache.TimelineMetricsCache; import org.apache.hadoop.metrics2.sink.timeline.configuration.Configuration; -import org.apache.hadoop.metrics2.sink.util.Servers; import java.io.IOException; import java.net.InetAddress; -import java.net.InetSocketAddress; import java.net.SocketAddress; import java.net.UnknownHostException; import java.util.ArrayList; @@ -50,7 +48,6 @@ import java.util.concurrent.TimeUnit; public class FlumeTimelineMetricsSink extends AbstractTimelineMetricsSink implements MonitorService { - private SocketAddress socketAddress; private String collectorUri; private TimelineMetricsCache metricsCache; private ScheduledExecutorService scheduledExecutorService; @@ -94,11 +91,6 @@ public class FlumeTimelineMetricsSink extends AbstractTimelineMetricsSink implem String collectorHostname = configuration.getProperty(COLLECTOR_HOST_PROPERTY); String port = configuration.getProperty(COLLECTOR_PORT_PROPERTY); collectorUri = "http://" + collectorHostname + ":" + port + "/ws/v1/timeline/metrics"; - List<InetSocketAddress> socketAddresses = - Servers.parse(collectorHostname, Integer.valueOf(port)); - if (socketAddresses != null && !socketAddresses.isEmpty()) { - socketAddress = socketAddresses.get(0); - } pollFrequency = Long.parseLong(configuration.getProperty("collectionFrequency")); String[] metrics = configuration.getProperty(COUNTER_METRICS_PROPERTY).trim().split(","); @@ -106,11 +98,6 @@ public class FlumeTimelineMetricsSink extends AbstractTimelineMetricsSink implem } @Override - public SocketAddress getServerSocketAddress() { - return socketAddress; - } - - @Override public String getCollectorUri() { return collectorUri; } http://git-wip-us.apache.org/repos/asf/ambari/blob/913c086a/ambari-metrics/ambari-metrics-hadoop-sink/src/main/java/org/apache/hadoop/metrics2/sink/timeline/HadoopTimelineMetricsSink.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-hadoop-sink/src/main/java/org/apache/hadoop/metrics2/sink/timeline/HadoopTimelineMetricsSink.java b/ambari-metrics/ambari-metrics-hadoop-sink/src/main/java/org/apache/hadoop/metrics2/sink/timeline/HadoopTimelineMetricsSink.java index 06f6011..9ecb0ed 100644 --- a/ambari-metrics/ambari-metrics-hadoop-sink/src/main/java/org/apache/hadoop/metrics2/sink/timeline/HadoopTimelineMetricsSink.java +++ b/ambari-metrics/ambari-metrics-hadoop-sink/src/main/java/org/apache/hadoop/metrics2/sink/timeline/HadoopTimelineMetricsSink.java @@ -131,13 +131,6 @@ public class HadoopTimelineMetricsSink extends AbstractTimelineMetricsSink imple return conf.getPrefix(); } - protected SocketAddress getServerSocketAddress() { - if (metricsServers != null && !metricsServers.isEmpty()) { - return metricsServers.get(0); - } - return null; - } - @Override protected String getCollectorUri() { return collectorUri; http://git-wip-us.apache.org/repos/asf/ambari/blob/913c086a/ambari-metrics/ambari-metrics-kafka-sink/src/main/java/org/apache/hadoop/metrics2/sink/kafka/KafkaTimelineMetricsReporter.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-kafka-sink/src/main/java/org/apache/hadoop/metrics2/sink/kafka/KafkaTimelineMetricsReporter.java b/ambari-metrics/ambari-metrics-kafka-sink/src/main/java/org/apache/hadoop/metrics2/sink/kafka/KafkaTimelineMetricsReporter.java index 1f44494..cc365bd 100644 --- a/ambari-metrics/ambari-metrics-kafka-sink/src/main/java/org/apache/hadoop/metrics2/sink/kafka/KafkaTimelineMetricsReporter.java +++ b/ambari-metrics/ambari-metrics-kafka-sink/src/main/java/org/apache/hadoop/metrics2/sink/kafka/KafkaTimelineMetricsReporter.java @@ -20,8 +20,6 @@ package org.apache.hadoop.metrics2.sink.kafka; import java.io.IOException; import java.net.InetAddress; -import java.net.InetSocketAddress; -import java.net.SocketAddress; import java.net.UnknownHostException; import java.util.ArrayList; import java.util.List; @@ -40,7 +38,6 @@ import org.apache.hadoop.metrics2.sink.timeline.AbstractTimelineMetricsSink; import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric; import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics; import org.apache.hadoop.metrics2.sink.timeline.cache.TimelineMetricsCache; -import org.apache.hadoop.metrics2.sink.util.Servers; import com.yammer.metrics.Metrics; import com.yammer.metrics.core.Counter; @@ -73,16 +70,10 @@ public class KafkaTimelineMetricsReporter extends AbstractTimelineMetricsSink im private final Object lock = new Object(); private String collectorUri; private String hostname; - private SocketAddress socketAddress; private TimelineScheduledReporter reporter; private TimelineMetricsCache metricsCache; @Override - protected SocketAddress getServerSocketAddress() { - return socketAddress; - } - - @Override protected String getCollectorUri() { return collectorUri; } @@ -110,18 +101,12 @@ public class KafkaTimelineMetricsReporter extends AbstractTimelineMetricsSink im String metricCollectorPort = props.getString(TIMELINE_PORT_PROPERTY, TIMELINE_DEFAULT_PORT); setMetricsCache(new TimelineMetricsCache(maxRowCacheSize, metricsSendInterval)); collectorUri = "http://" + metricCollectorHost + ":" + metricCollectorPort + "/ws/v1/timeline/metrics"; - List<InetSocketAddress> socketAddresses = Servers.parse(metricCollectorHost, - Integer.parseInt(metricCollectorPort)); - if (socketAddresses != null && !socketAddresses.isEmpty()) { - socketAddress = socketAddresses.get(0); - } initializeReporter(); if (props.getBoolean(TIMELINE_REPORTER_ENABLED_PROPERTY, false)) { startReporter(metricsConfig.pollingIntervalSecs()); } if (LOG.isTraceEnabled()) { LOG.trace("CollectorUri = " + collectorUri); - LOG.trace("SocketAddress = " + socketAddress); LOG.trace("MetricsSendInterval = " + metricsSendInterval); LOG.trace("MaxRowCacheSize = " + maxRowCacheSize); } http://git-wip-us.apache.org/repos/asf/ambari/blob/913c086a/ambari-metrics/ambari-metrics-storm-sink/src/main/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsReporter.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-storm-sink/src/main/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsReporter.java b/ambari-metrics/ambari-metrics-storm-sink/src/main/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsReporter.java index 2d4baa3..89fc2ca 100644 --- a/ambari-metrics/ambari-metrics-storm-sink/src/main/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsReporter.java +++ b/ambari-metrics/ambari-metrics-storm-sink/src/main/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsReporter.java @@ -29,11 +29,9 @@ import org.apache.commons.lang.Validate; import org.apache.hadoop.metrics2.sink.timeline.AbstractTimelineMetricsSink; import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric; import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics; -import org.apache.hadoop.metrics2.sink.util.Servers; +import org.apache.hadoop.metrics2.sink.timeline.UnableToConnectException; import java.net.InetAddress; -import java.net.InetSocketAddress; -import java.net.SocketAddress; import java.net.UnknownHostException; import java.util.ArrayList; import java.util.List; @@ -48,7 +46,6 @@ public class StormTimelineMetricsReporter extends AbstractTimelineMetricsSink public static final String APP_ID = "appId"; private String hostname; - private SocketAddress socketAddress; private String collectorUri; private NimbusClient nimbusClient; private String applicationId; @@ -58,11 +55,6 @@ public class StormTimelineMetricsReporter extends AbstractTimelineMetricsSink } @Override - protected SocketAddress getServerSocketAddress() { - return this.socketAddress; - } - - @Override protected String getCollectorUri() { return this.collectorUri; } @@ -85,11 +77,6 @@ public class StormTimelineMetricsReporter extends AbstractTimelineMetricsSink String port = cf.get(COLLECTOR_PORT).toString(); applicationId = cf.get(APP_ID).toString(); collectorUri = "http://" + collectorHostname + ":" + port + "/ws/v1/timeline/metrics"; - List<InetSocketAddress> socketAddresses = - Servers.parse(collectorHostname, Integer.valueOf(port)); - if (socketAddresses != null && !socketAddresses.isEmpty()) { - socketAddress = socketAddresses.get(0); - } } catch (Exception e) { LOG.warn("Could not initialize metrics collector, please specify host, " + "port under $STORM_HOME/conf/config.yaml ", e); @@ -139,7 +126,11 @@ public class StormTimelineMetricsReporter extends AbstractTimelineMetricsSink TimelineMetrics timelineMetrics = new TimelineMetrics(); timelineMetrics.setMetrics(totalMetrics); - emitMetrics(timelineMetrics); + try { + emitMetrics(timelineMetrics); + } catch (UnableToConnectException e) { + LOG.warn("Unable to connect to Metrics Collector " + e.getConnectUrl() + ". " + e.getMessage()); + } } http://git-wip-us.apache.org/repos/asf/ambari/blob/913c086a/ambari-metrics/ambari-metrics-storm-sink/src/main/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsSink.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-storm-sink/src/main/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsSink.java b/ambari-metrics/ambari-metrics-storm-sink/src/main/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsSink.java index dd0e72f..767695b 100644 --- a/ambari-metrics/ambari-metrics-storm-sink/src/main/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsSink.java +++ b/ambari-metrics/ambari-metrics-storm-sink/src/main/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsSink.java @@ -30,12 +30,9 @@ import org.apache.hadoop.metrics2.sink.timeline.AbstractTimelineMetricsSink; import org.apache.hadoop.metrics2.sink.timeline.UnableToConnectException; import org.apache.hadoop.metrics2.sink.timeline.cache.TimelineMetricsCache; import org.apache.hadoop.metrics2.sink.timeline.configuration.Configuration; -import org.apache.hadoop.metrics2.sink.util.Servers; import java.io.IOException; import java.net.InetAddress; -import java.net.InetSocketAddress; -import java.net.SocketAddress; import java.net.UnknownHostException; import java.util.ArrayList; import java.util.Collection; @@ -43,17 +40,11 @@ import java.util.List; import java.util.Map; public class StormTimelineMetricsSink extends AbstractTimelineMetricsSink implements IMetricsConsumer { - private SocketAddress socketAddress; private String collectorUri; private TimelineMetricsCache metricsCache; private String hostname; @Override - protected SocketAddress getServerSocketAddress() { - return socketAddress; - } - - @Override protected String getCollectorUri() { return collectorUri; } @@ -74,11 +65,6 @@ public class StormTimelineMetricsSink extends AbstractTimelineMetricsSink implem String.valueOf(TimelineMetricsCache.MAX_EVICTION_TIME_MILLIS))); metricsCache = new TimelineMetricsCache(maxRowCacheSize, metricsSendInterval); collectorUri = "http://" + configuration.getProperty(COLLECTOR_HOST_PROPERTY) + ":" + configuration.getProperty(COLLECTOR_PORT_PROPERTY) + "/ws/v1/timeline/metrics"; - List<InetSocketAddress> socketAddresses = - Servers.parse(configuration.getProperty(configuration.getProperty(COLLECTOR_HOST_PROPERTY)), Integer.valueOf(configuration.getProperty(COLLECTOR_PORT_PROPERTY))); - if (socketAddresses != null && !socketAddresses.isEmpty()) { - socketAddress = socketAddresses.get(0); - } } @Override @@ -134,7 +120,4 @@ public class StormTimelineMetricsSink extends AbstractTimelineMetricsSink implem this.metricsCache = metricsCache; } - public void setServerSocketAddress(SocketAddress socketAddress) { - this.socketAddress = socketAddress; - } } http://git-wip-us.apache.org/repos/asf/ambari/blob/913c086a/ambari-metrics/ambari-metrics-storm-sink/src/test/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsSinkTest.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-storm-sink/src/test/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsSinkTest.java b/ambari-metrics/ambari-metrics-storm-sink/src/test/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsSinkTest.java index 15021e5..a0600e5 100644 --- a/ambari-metrics/ambari-metrics-storm-sink/src/test/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsSinkTest.java +++ b/ambari-metrics/ambari-metrics-storm-sink/src/test/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsSinkTest.java @@ -64,7 +64,6 @@ public class StormTimelineMetricsSinkTest { HttpClient httpClient = createNiceMock(HttpClient.class); stormTimelineMetricsSink.setHttpClient(httpClient); expect(httpClient.executeMethod(anyObject(PostMethod.class))).andReturn(200).once(); - stormTimelineMetricsSink.setServerSocketAddress(createNiceMock(SocketAddress.class)); replay(timelineMetricsCache, httpClient); stormTimelineMetricsSink.handleDataPoints( new IMetricsConsumer.TaskInfo("localhost", 1234, "testComponent", 42, 20000L, 60),
