Repository: hadoop
Updated Branches:
  refs/heads/branch-2 05d831c4b -> aee297cce
  refs/heads/trunk f33c99ba3 -> 8004a0023


HADOOP-10181. GangliaContext does not work with multicast ganglia setup. 
Contributed by Andrew Johnson.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/8004a002
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/8004a002
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/8004a002

Branch: refs/heads/trunk
Commit: 8004a002307940176cc188657c68e85171a5b5a8
Parents: f33c99b
Author: cnauroth <[email protected]>
Authored: Mon Feb 2 11:09:09 2015 -0800
Committer: cnauroth <[email protected]>
Committed: Mon Feb 2 11:09:09 2015 -0800

----------------------------------------------------------------------
 hadoop-common-project/hadoop-common/CHANGES.txt |  3 +
 .../hadoop/metrics/ganglia/GangliaContext.java  | 33 ++++++--
 .../apache/hadoop/metrics/ganglia/package.html  |  6 ++
 .../sink/ganglia/AbstractGangliaSink.java       | 36 ++++++---
 .../metrics/ganglia/TestGangliaContext.java     | 41 ++++++++++
 .../metrics2/sink/ganglia/TestGangliaSink.java  | 81 ++++++++++++++++++++
 6 files changed, 183 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/8004a002/hadoop-common-project/hadoop-common/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/CHANGES.txt 
b/hadoop-common-project/hadoop-common/CHANGES.txt
index f3647fb..612c0f5 100644
--- a/hadoop-common-project/hadoop-common/CHANGES.txt
+++ b/hadoop-common-project/hadoop-common/CHANGES.txt
@@ -811,6 +811,9 @@ Release 2.7.0 - UNRELEASED
     HADOOP-11432. Fix SymlinkBaseTest#testCreateLinkUsingPartQualPath2.
     (Liang Xie via gera)
 
+    HADOOP-10181. GangliaContext does not work with multicast ganglia setup.
+    (Andrew Johnson via cnauroth)
+
 Release 2.6.1 - UNRELEASED
 
   INCOMPATIBLE CHANGES

http://git-wip-us.apache.org/repos/asf/hadoop/blob/8004a002/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics/ganglia/GangliaContext.java
----------------------------------------------------------------------
diff --git 
a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics/ganglia/GangliaContext.java
 
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics/ganglia/GangliaContext.java
index 0e70778..5ed2652 100644
--- 
a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics/ganglia/GangliaContext.java
+++ 
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics/ganglia/GangliaContext.java
@@ -21,10 +21,7 @@
 package org.apache.hadoop.metrics.ganglia;
 
 import java.io.IOException;
-import java.net.DatagramPacket;
-import java.net.DatagramSocket;
-import java.net.SocketAddress;
-import java.net.SocketException;
+import java.net.*;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -54,13 +51,16 @@ public class GangliaContext extends AbstractMetricsContext {
   private static final String SLOPE_PROPERTY = "slope";
   private static final String TMAX_PROPERTY = "tmax";
   private static final String DMAX_PROPERTY = "dmax";
-    
+  private static final String MULTICAST_PROPERTY = "multicast";
+  private static final String MULTICAST_TTL_PROPERTY = "multicast.ttl";
+
   private static final String DEFAULT_UNITS = "";
   private static final String DEFAULT_SLOPE = "both";
   private static final int DEFAULT_TMAX = 60;
   private static final int DEFAULT_DMAX = 0;
   private static final int DEFAULT_PORT = 8649;
   private static final int BUFFER_SIZE = 1500;       // as per libgmond.c
+  private static final int DEFAULT_MULTICAST_TTL = 1;
 
   private final Log LOG = LogFactory.getLog(this.getClass());    
 
@@ -83,6 +83,8 @@ public class GangliaContext extends AbstractMetricsContext {
   private Map<String,String> slopeTable;
   private Map<String,String> tmaxTable;
   private Map<String,String> dmaxTable;
+  private boolean multicastEnabled;
+  private int multicastTtl;
     
   protected DatagramSocket datagramSocket;
     
@@ -104,11 +106,26 @@ public class GangliaContext extends 
AbstractMetricsContext {
     slopeTable = getAttributeTable(SLOPE_PROPERTY);
     tmaxTable  = getAttributeTable(TMAX_PROPERTY);
     dmaxTable  = getAttributeTable(DMAX_PROPERTY);
+    multicastEnabled = Boolean.parseBoolean(getAttribute(MULTICAST_PROPERTY));
+    String multicastTtlValue = getAttribute(MULTICAST_TTL_PROPERTY);
+    if (multicastEnabled) {
+      if (multicastTtlValue == null) {
+        multicastTtl = DEFAULT_MULTICAST_TTL;
+      } else {
+        multicastTtl = Integer.parseInt(multicastTtlValue);
+      }
+    }
         
     try {
-      datagramSocket = new DatagramSocket();
-    } catch (SocketException se) {
-      se.printStackTrace();
+      if (multicastEnabled) {
+        LOG.info("Enabling multicast for Ganglia with TTL " + multicastTtl);
+        datagramSocket = new MulticastSocket();
+        ((MulticastSocket) datagramSocket).setTimeToLive(multicastTtl);
+      } else {
+        datagramSocket = new DatagramSocket();
+      }
+    } catch (IOException e) {
+      LOG.error(e);
     }
   }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/8004a002/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics/ganglia/package.html
----------------------------------------------------------------------
diff --git 
a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics/ganglia/package.html
 
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics/ganglia/package.html
index 87598e5..b9acfae 100644
--- 
a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics/ganglia/package.html
+++ 
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics/ganglia/package.html
@@ -54,6 +54,12 @@ These are the implementation specific factory attributes
     <dt><i>contextName</i>.period</dt>
     <dd>The period in seconds on which the metric data is sent to the
     server(s).</dd>
+
+    <dt><i>contextName</i>.multicast</dt>
+    <dd>Enable multicast for Ganglia</dd>
+
+    <dt><i>contextName</i>.multicast.ttl</dt>
+    <dd>TTL for multicast packets</dd>
     
     <dt><i>contextName</i>.units.<i>recordName</i>.<i>metricName</i></dt>
     <dd>The units for the specified metric in the specified record.</dd>

http://git-wip-us.apache.org/repos/asf/hadoop/blob/8004a002/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/sink/ganglia/AbstractGangliaSink.java
----------------------------------------------------------------------
diff --git 
a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/sink/ganglia/AbstractGangliaSink.java
 
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/sink/ganglia/AbstractGangliaSink.java
index 88ad647..c9df0ff 100644
--- 
a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/sink/ganglia/AbstractGangliaSink.java
+++ 
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/sink/ganglia/AbstractGangliaSink.java
@@ -19,12 +19,7 @@
 package org.apache.hadoop.metrics2.sink.ganglia;
 
 import java.io.IOException;
-import java.net.DatagramPacket;
-import java.net.DatagramSocket;
-import java.net.InetSocketAddress;
-import java.net.SocketAddress;
-import java.net.SocketException;
-import java.net.UnknownHostException;
+import java.net.*;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -64,7 +59,11 @@ public abstract class AbstractGangliaSink implements 
MetricsSink {
   public static final int DEFAULT_DMAX = 0;
   public static final GangliaSlope DEFAULT_SLOPE = GangliaSlope.both;
   public static final int DEFAULT_PORT = 8649;
+  public static final boolean DEFAULT_MULTICAST_ENABLED = false;
+  public static final int DEFAULT_MULTICAST_TTL = 1;
   public static final String SERVERS_PROPERTY = "servers";
+  public static final String MULTICAST_ENABLED_PROPERTY = "multicast";
+  public static final String MULTICAST_TTL_PROPERTY = "multicast.ttl";
   public static final int BUFFER_SIZE = 1500; // as per libgmond.c
   public static final String SUPPORT_SPARSE_METRICS_PROPERTY = "supportsparse";
   public static final boolean SUPPORT_SPARSE_METRICS_DEFAULT = false;
@@ -73,6 +72,8 @@ public abstract class AbstractGangliaSink implements 
MetricsSink {
   private String hostName = "UNKNOWN.example.com";
   private DatagramSocket datagramSocket;
   private List<? extends SocketAddress> metricsServers;
+  private boolean multicastEnabled;
+  private int multicastTtl;
   private byte[] buffer = new byte[BUFFER_SIZE];
   private int offset;
   private boolean supportSparseMetrics = SUPPORT_SPARSE_METRICS_DEFAULT;
@@ -134,6 +135,9 @@ public abstract class AbstractGangliaSink implements 
MetricsSink {
     // load the gannglia servers from properties
     metricsServers = Servers.parse(conf.getString(SERVERS_PROPERTY),
         DEFAULT_PORT);
+    multicastEnabled = conf.getBoolean(MULTICAST_ENABLED_PROPERTY,
+            DEFAULT_MULTICAST_ENABLED);
+    multicastTtl = conf.getInt(MULTICAST_TTL_PROPERTY, DEFAULT_MULTICAST_TTL);
 
     // extract the Ganglia conf per metrics
     gangliaConfMap = new HashMap<String, GangliaConf>();
@@ -143,9 +147,15 @@ public abstract class AbstractGangliaSink implements 
MetricsSink {
     loadGangliaConf(GangliaConfType.slope);
 
     try {
-      datagramSocket = new DatagramSocket();
-    } catch (SocketException se) {
-      LOG.error(se);
+      if (multicastEnabled) {
+        LOG.info("Enabling multicast for Ganglia with TTL " + multicastTtl);
+        datagramSocket = new MulticastSocket();
+        ((MulticastSocket) datagramSocket).setTimeToLive(multicastTtl);
+      } else {
+        datagramSocket = new DatagramSocket();
+      }
+    } catch (IOException e) {
+      LOG.error(e);
     }
 
     // see if sparseMetrics is supported. Default is false
@@ -295,4 +305,12 @@ public abstract class AbstractGangliaSink implements 
MetricsSink {
   void setDatagramSocket(DatagramSocket datagramSocket) {
     this.datagramSocket = datagramSocket;
   }
+
+  /**
+   * Used only by unit tests
+   * @return the datagramSocket for this sink
+   */
+  DatagramSocket getDatagramSocket() {
+    return datagramSocket;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/8004a002/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/metrics/ganglia/TestGangliaContext.java
----------------------------------------------------------------------
diff --git 
a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/metrics/ganglia/TestGangliaContext.java
 
b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/metrics/ganglia/TestGangliaContext.java
index deb8231..8637f8c 100644
--- 
a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/metrics/ganglia/TestGangliaContext.java
+++ 
b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/metrics/ganglia/TestGangliaContext.java
@@ -22,13 +22,54 @@
 package org.apache.hadoop.metrics.ganglia;
 
 import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 
 import org.apache.hadoop.metrics.ContextFactory;
 import org.apache.hadoop.metrics.spi.AbstractMetricsContext;
 
+import java.net.MulticastSocket;
+
 public class TestGangliaContext {
+  @Test
+  public void testShouldCreateDatagramSocketByDefault() throws Exception {
+    GangliaContext context = new GangliaContext();
+    context.init("gangliaContext", ContextFactory.getFactory());
+    assertFalse("Created MulticastSocket", context.datagramSocket instanceof 
MulticastSocket);
+  }
+
+  @Test
+  public void testShouldCreateDatagramSocketIfMulticastIsDisabled() throws 
Exception {
+    GangliaContext context = new GangliaContext();
+    ContextFactory factory = ContextFactory.getFactory();
+    factory.setAttribute("gangliaContext.multicast", "false");
+    context.init("gangliaContext", factory);
+    assertFalse("Created MulticastSocket", context.datagramSocket instanceof 
MulticastSocket);
+  }
+
+  @Test
+  public void testShouldCreateMulticastSocket() throws Exception {
+    GangliaContext context = new GangliaContext();
+    ContextFactory factory = ContextFactory.getFactory();
+    factory.setAttribute("gangliaContext.multicast", "true");
+    context.init("gangliaContext", factory);
+    assertTrue("Did not create MulticastSocket", context.datagramSocket 
instanceof MulticastSocket);
+    MulticastSocket multicastSocket = (MulticastSocket) context.datagramSocket;
+    assertEquals("Did not set default TTL", multicastSocket.getTimeToLive(), 
1);
+  }
+
+  @Test
+  public void testShouldSetMulticastSocketTtl() throws Exception {
+    GangliaContext context = new GangliaContext();
+    ContextFactory factory = ContextFactory.getFactory();
+    factory.setAttribute("gangliaContext.multicast", "true");
+    factory.setAttribute("gangliaContext.multicast.ttl", "10");
+    context.init("gangliaContext", factory);
+    MulticastSocket multicastSocket = (MulticastSocket) context.datagramSocket;
+    assertEquals("Did not set TTL", multicastSocket.getTimeToLive(), 10);
+  }
   
   @Test
   public void testCloseShouldCloseTheSocketWhichIsCreatedByInit() throws 
Exception {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/8004a002/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/metrics2/sink/ganglia/TestGangliaSink.java
----------------------------------------------------------------------
diff --git 
a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/metrics2/sink/ganglia/TestGangliaSink.java
 
b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/metrics2/sink/ganglia/TestGangliaSink.java
new file mode 100644
index 0000000..aa2c259
--- /dev/null
+++ 
b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/metrics2/sink/ganglia/TestGangliaSink.java
@@ -0,0 +1,81 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.metrics2.sink.ganglia;
+
+import org.apache.commons.configuration.SubsetConfiguration;
+import org.apache.hadoop.metrics2.impl.ConfigBuilder;
+import org.junit.Test;
+
+import java.net.DatagramSocket;
+import java.net.MulticastSocket;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+public class TestGangliaSink {
+    @Test
+    public void testShouldCreateDatagramSocketByDefault() throws Exception {
+        SubsetConfiguration conf = new ConfigBuilder()
+                .subset("test.sink.ganglia");
+
+        GangliaSink30 gangliaSink = new GangliaSink30();
+        gangliaSink.init(conf);
+        DatagramSocket socket = gangliaSink.getDatagramSocket();
+        assertFalse("Did not create DatagramSocket", socket == null || socket 
instanceof MulticastSocket);
+    }
+
+    @Test
+    public void testShouldCreateDatagramSocketIfMulticastIsDisabled() throws 
Exception {
+        SubsetConfiguration conf = new ConfigBuilder()
+                .add("test.sink.ganglia.multicast", false)
+                .subset("test.sink.ganglia");
+        GangliaSink30 gangliaSink = new GangliaSink30();
+        gangliaSink.init(conf);
+        DatagramSocket socket = gangliaSink.getDatagramSocket();
+        assertFalse("Did not create DatagramSocket", socket == null || socket 
instanceof MulticastSocket);
+    }
+
+    @Test
+    public void testShouldCreateMulticastSocket() throws Exception {
+        SubsetConfiguration conf = new ConfigBuilder()
+                .add("test.sink.ganglia.multicast", true)
+                .subset("test.sink.ganglia");
+        GangliaSink30 gangliaSink = new GangliaSink30();
+        gangliaSink.init(conf);
+        DatagramSocket socket = gangliaSink.getDatagramSocket();
+        assertTrue("Did not create MulticastSocket", socket != null && socket 
instanceof MulticastSocket);
+        int ttl = ((MulticastSocket) socket).getTimeToLive();
+        assertEquals("Did not set default TTL", 1, ttl);
+    }
+
+    @Test
+    public void testShouldSetMulticastSocketTtl() throws Exception {
+        SubsetConfiguration conf = new ConfigBuilder()
+                .add("test.sink.ganglia.multicast", true)
+                .add("test.sink.ganglia.multicast.ttl", 3)
+                .subset("test.sink.ganglia");
+        GangliaSink30 gangliaSink = new GangliaSink30();
+        gangliaSink.init(conf);
+        DatagramSocket socket = gangliaSink.getDatagramSocket();
+        assertTrue("Did not create MulticastSocket", socket != null && socket 
instanceof MulticastSocket);
+        int ttl = ((MulticastSocket) socket).getTimeToLive();
+        assertEquals("Did not set TTL", 3, ttl);
+    }
+}

Reply via email to