Author: edwardyoon
Date: Wed Dec 21 07:49:05 2011
New Revision: 1221636
URL: http://svn.apache.org/viewvc?rev=1221636&view=rev
Log:
[HAMA-489] Check port availability before forking child process
Modified:
incubator/hama/trunk/CHANGES.txt
incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/GroomServer.java
incubator/hama/trunk/core/src/main/java/org/apache/hama/util/BSPNetUtils.java
Modified: incubator/hama/trunk/CHANGES.txt
URL:
http://svn.apache.org/viewvc/incubator/hama/trunk/CHANGES.txt?rev=1221636&r1=1221635&r2=1221636&view=diff
==============================================================================
--- incubator/hama/trunk/CHANGES.txt (original)
+++ incubator/hama/trunk/CHANGES.txt Wed Dec 21 07:49:05 2011
@@ -16,6 +16,7 @@ Release 0.4 - Unreleased
BUG FIXES
+ HAMA-489: Check port availability before forking child process (edwardyoon)
HAMA-474: ClusterStatus.getTasks() always returns 0 (edwardyoon)
HAMA-472: The task should be killed if it fails to initialize (edwardyoon)
HAMA-465: LocalJobRunner should support combiners and IO (tjungblut)
Modified:
incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/GroomServer.java
URL:
http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/GroomServer.java?rev=1221636&r1=1221635&r2=1221636&view=diff
==============================================================================
---
incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/GroomServer.java
(original)
+++
incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/GroomServer.java
Wed Dec 21 07:49:05 2011
@@ -57,6 +57,7 @@ import org.apache.hama.bsp.sync.SyncExce
import org.apache.hama.ipc.BSPPeerProtocol;
import org.apache.hama.ipc.GroomProtocol;
import org.apache.hama.ipc.MasterProtocol;
+import org.apache.hama.util.BSPNetUtils;
import org.apache.hama.zookeeper.QuorumPeer;
import org.apache.log4j.LogManager;
import org.apache.zookeeper.WatchedEvent;
@@ -146,19 +147,14 @@ public class GroomServer implements Runn
LOG.info("Launch " + actions.length + " tasks.");
assignedPeerNames = new HashMap<TaskAttemptID, Integer>();
- int i = 0;
+ int prevPort = Constants.DEFAULT_PEER_PORT;
- // TODO find another way to manage all activate peers.
for (GroomServerAction action : actions) {
Task t = ((LaunchTaskAction) action).getTask();
- int peerPort = (Constants.DEFAULT_PEER_PORT + i);
- assignedPeerNames.put(t.getTaskID(), peerPort);
-
- i++;
- }
-
- for (GroomServerAction action : actions) {
+ prevPort = BSPNetUtils.getNextAvailable(prevPort);
+ assignedPeerNames.put(t.getTaskID(), prevPort);
+
if (action instanceof LaunchTaskAction) {
startNewTask((LaunchTaskAction) action);
} else {
Modified:
incubator/hama/trunk/core/src/main/java/org/apache/hama/util/BSPNetUtils.java
URL:
http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/util/BSPNetUtils.java?rev=1221636&r1=1221635&r2=1221636&view=diff
==============================================================================
---
incubator/hama/trunk/core/src/main/java/org/apache/hama/util/BSPNetUtils.java
(original)
+++
incubator/hama/trunk/core/src/main/java/org/apache/hama/util/BSPNetUtils.java
Wed Dec 21 07:49:05 2011
@@ -17,16 +17,22 @@
*/
package org.apache.hama.util;
+import java.io.IOException;
+import java.net.DatagramSocket;
import java.net.InetAddress;
import java.net.InetSocketAddress;
+import java.net.ServerSocket;
import java.net.UnknownHostException;
+import java.util.NoSuchElementException;
+import org.apache.hama.Constants;
import org.apache.mina.util.AvailablePortFinder;
/**
* NetUtils for our needs.
*/
public class BSPNetUtils {
+ public static final int MAX_PORT_NUMBER = 65535;
/**
* Gets the canonical hostname of this machine.
@@ -78,4 +84,55 @@ public class BSPNetUtils {
Integer.valueOf(peerAddrParts[1]));
}
+ /**
+ * Checks to see if a specific port is available.
+ *
+ * @param port the port to check for availability
+ */
+ public static boolean available(int port) {
+ if (port < Constants.DEFAULT_PEER_PORT || port > MAX_PORT_NUMBER) {
+ throw new IllegalArgumentException("Invalid start port: " + port);
+ }
+
+ ServerSocket ss = null;
+ DatagramSocket ds = null;
+ try {
+ ss = new ServerSocket(port);
+ ss.setReuseAddress(true);
+ ds = new DatagramSocket(port);
+ ds.setReuseAddress(true);
+ return true;
+ } catch (IOException e) {
+ } finally {
+ if (ds != null) {
+ ds.close();
+ }
+
+ if (ss != null) {
+ try {
+ ss.close();
+ } catch (IOException e) {
+ /* should not be thrown */
+ }
+ }
+ }
+
+ return false;
+ }
+
+ public static int getNextAvailable(int fromPort) {
+ if ((fromPort < Constants.DEFAULT_PEER_PORT)
+ || (fromPort > MAX_PORT_NUMBER)) {
+ throw new IllegalArgumentException("Invalid start port: " + fromPort);
+ }
+
+ for (int i = fromPort + 1; i <= MAX_PORT_NUMBER; i++) {
+ if (available(i)) {
+ return i;
+ }
+ }
+
+ throw new NoSuchElementException("Could not find an available port "
+ + "above " + fromPort);
+ }
}