Author: edwardyoon
Date: Wed Dec 21 07:49:05 2011
New Revision: 1221636

URL: http://svn.apache.org/viewvc?rev=1221636&view=rev
Log:
[HAMA-489] Check port availability before forking child process

Modified:
    incubator/hama/trunk/CHANGES.txt
    incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/GroomServer.java
    
incubator/hama/trunk/core/src/main/java/org/apache/hama/util/BSPNetUtils.java

Modified: incubator/hama/trunk/CHANGES.txt
URL: 
http://svn.apache.org/viewvc/incubator/hama/trunk/CHANGES.txt?rev=1221636&r1=1221635&r2=1221636&view=diff
==============================================================================
--- incubator/hama/trunk/CHANGES.txt (original)
+++ incubator/hama/trunk/CHANGES.txt Wed Dec 21 07:49:05 2011
@@ -16,6 +16,7 @@ Release 0.4 - Unreleased
 
   BUG FIXES
 
+    HAMA-489: Check port availability before forking child process (edwardyoon)
     HAMA-474: ClusterStatus.getTasks() always returns 0 (edwardyoon)
     HAMA-472: The task should be killed if it fails to initialize (edwardyoon)
     HAMA-465: LocalJobRunner should support combiners and IO (tjungblut)

Modified: 
incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/GroomServer.java
URL: 
http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/GroomServer.java?rev=1221636&r1=1221635&r2=1221636&view=diff
==============================================================================
--- 
incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/GroomServer.java 
(original)
+++ 
incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/GroomServer.java 
Wed Dec 21 07:49:05 2011
@@ -57,6 +57,7 @@ import org.apache.hama.bsp.sync.SyncExce
 import org.apache.hama.ipc.BSPPeerProtocol;
 import org.apache.hama.ipc.GroomProtocol;
 import org.apache.hama.ipc.MasterProtocol;
+import org.apache.hama.util.BSPNetUtils;
 import org.apache.hama.zookeeper.QuorumPeer;
 import org.apache.log4j.LogManager;
 import org.apache.zookeeper.WatchedEvent;
@@ -146,19 +147,14 @@ public class GroomServer implements Runn
         LOG.info("Launch " + actions.length + " tasks.");
 
         assignedPeerNames = new HashMap<TaskAttemptID, Integer>();
-        int i = 0;
+        int prevPort = Constants.DEFAULT_PEER_PORT;
 
-        // TODO find another way to manage all activate peers.
         for (GroomServerAction action : actions) {
           Task t = ((LaunchTaskAction) action).getTask();
 
-          int peerPort = (Constants.DEFAULT_PEER_PORT + i);
-          assignedPeerNames.put(t.getTaskID(), peerPort);
-
-          i++;
-        }
-
-        for (GroomServerAction action : actions) {
+          prevPort = BSPNetUtils.getNextAvailable(prevPort);
+          assignedPeerNames.put(t.getTaskID(), prevPort);
+          
           if (action instanceof LaunchTaskAction) {
             startNewTask((LaunchTaskAction) action);
           } else {

Modified: 
incubator/hama/trunk/core/src/main/java/org/apache/hama/util/BSPNetUtils.java
URL: 
http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/util/BSPNetUtils.java?rev=1221636&r1=1221635&r2=1221636&view=diff
==============================================================================
--- 
incubator/hama/trunk/core/src/main/java/org/apache/hama/util/BSPNetUtils.java 
(original)
+++ 
incubator/hama/trunk/core/src/main/java/org/apache/hama/util/BSPNetUtils.java 
Wed Dec 21 07:49:05 2011
@@ -17,16 +17,22 @@
  */
 package org.apache.hama.util;
 
+import java.io.IOException;
+import java.net.DatagramSocket;
 import java.net.InetAddress;
 import java.net.InetSocketAddress;
+import java.net.ServerSocket;
 import java.net.UnknownHostException;
+import java.util.NoSuchElementException;
 
+import org.apache.hama.Constants;
 import org.apache.mina.util.AvailablePortFinder;
 
 /**
  * NetUtils for our needs.
  */
 public class BSPNetUtils {
+  public static final int MAX_PORT_NUMBER = 65535;
 
   /**
    * Gets the canonical hostname of this machine.
@@ -78,4 +84,55 @@ public class BSPNetUtils {
         Integer.valueOf(peerAddrParts[1]));
   }
 
+  /**
+   * Checks to see if a specific port is available.
+   * 
+   * @param port the port to check for availability
+   */
+  public static boolean available(int port) {
+    if (port < Constants.DEFAULT_PEER_PORT || port > MAX_PORT_NUMBER) {
+      throw new IllegalArgumentException("Invalid start port: " + port);
+    }
+
+    ServerSocket ss = null;
+    DatagramSocket ds = null;
+    try {
+      ss = new ServerSocket(port);
+      ss.setReuseAddress(true);
+      ds = new DatagramSocket(port);
+      ds.setReuseAddress(true);
+      return true;
+    } catch (IOException e) {
+    } finally {
+      if (ds != null) {
+        ds.close();
+      }
+
+      if (ss != null) {
+        try {
+          ss.close();
+        } catch (IOException e) {
+          /* should not be thrown */
+        }
+      }
+    }
+
+    return false;
+  }
+
+  public static int getNextAvailable(int fromPort) {
+    if ((fromPort < Constants.DEFAULT_PEER_PORT)
+        || (fromPort > MAX_PORT_NUMBER)) {
+      throw new IllegalArgumentException("Invalid start port: " + fromPort);
+    }
+
+    for (int i = fromPort + 1; i <= MAX_PORT_NUMBER; i++) {
+      if (available(i)) {
+        return i;
+      }
+    }
+
+    throw new NoSuchElementException("Could not find an available port "
+        + "above " + fromPort);
+  }
 }


Reply via email to