Author: edwardyoon
Date: Thu Feb 6 12:49:10 2014
New Revision: 1565212
URL: http://svn.apache.org/r1565212
Log:
Handling BindException.
Modified:
hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeerImpl.java
hama/trunk/core/src/main/java/org/apache/hama/bsp/GroomServer.java
hama/trunk/core/src/main/java/org/apache/hama/bsp/LocalBSPRunner.java
hama/trunk/core/src/main/java/org/apache/hama/bsp/message/HamaMessageManagerImpl.java
hama/trunk/core/src/main/java/org/apache/hama/ipc/BSPPeerProtocol.java
hama/trunk/core/src/test/java/org/apache/hama/bsp/TestBSPTaskFaults.java
hama/trunk/yarn/src/main/java/org/apache/hama/bsp/BSPApplicationMaster.java
Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeerImpl.java
URL:
http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeerImpl.java?rev=1565212&r1=1565211&r2=1565212&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeerImpl.java
(original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeerImpl.java Thu Feb
6 12:49:10 2014
@@ -164,6 +164,7 @@ public final class BSPPeerImpl<K1, V1, K
Constants.DEFAULT_PEER_HOST);
int bindPort = conf
.getInt(Constants.PEER_PORT, Constants.DEFAULT_PEER_PORT);
+
peerAddress = new InetSocketAddress(bindAddress, bindPort);
// This function call may change the current peer address
Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/GroomServer.java
URL:
http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/GroomServer.java?rev=1565212&r1=1565211&r2=1565212&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/GroomServer.java
(original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/GroomServer.java Thu Feb
6 12:49:10 2014
@@ -125,6 +125,7 @@ public class GroomServer implements Runn
/** Map from taskId -> TaskInProgress. */
Map<TaskAttemptID, TaskInProgress> runningTasks = null;
Map<TaskAttemptID, TaskInProgress> finishedTasks = null;
+ Map<TaskAttemptID, Integer> assignedPeerNames = null;
Map<BSPJobID, RunningJob> runningJobs = null;
// new nexus between GroomServer and BSPMaster
@@ -157,9 +158,17 @@ public class GroomServer implements Runn
}
if (actions != null) {
+ int prevPort = Constants.DEFAULT_PEER_PORT;
for (GroomServerAction action : actions) {
if (action instanceof LaunchTaskAction) {
+ Task t = ((LaunchTaskAction) action).getTask();
+
+ synchronized (assignedPeerNames) {
+ prevPort = BSPNetUtils.getNextAvailable(prevPort);
+ assignedPeerNames.put(t.getTaskID(), prevPort);
+ }
+
LOG.info("Launch " + actions.length + " tasks.");
startNewTask((LaunchTaskAction) action);
} else if (action instanceof KillTaskAction) {
@@ -172,6 +181,7 @@ public class GroomServer implements Runn
tip.killAndCleanup(false);
tasks.remove(killAction.getTaskID());
runningTasks.remove(killAction.getTaskID());
+ assignedPeerNames.remove(killAction.getTaskID());
} catch (IOException ioe) {
throw new DirectiveException("Error when killing a "
+ "TaskInProgress.", ioe);
@@ -181,6 +191,12 @@ public class GroomServer implements Runn
LOG.info("Recovery action task.");
RecoverTaskAction recoverAction = (RecoverTaskAction) action;
Task t = recoverAction.getTask();
+
+ synchronized (assignedPeerNames) {
+ prevPort = BSPNetUtils.getNextAvailable(prevPort);
+ assignedPeerNames.put(t.getTaskID(), prevPort);
+ }
+
LOG.info("Recovery action task." + t.getTaskID());
try {
startRecoveryTask(recoverAction);
@@ -312,6 +328,8 @@ public class GroomServer implements Runn
this.conf.set(Constants.PEER_HOST, localHostname);
this.conf.set(Constants.GROOM_RPC_HOST, localHostname);
this.maxCurrentTasks = conf.getInt(Constants.MAX_TASKS_PER_GROOM, 3);
+ this.assignedPeerNames = new HashMap<TaskAttemptID, Integer>(
+ 2 * this.maxCurrentTasks);
int rpcPort = -1;
String rpcAddr = null;
@@ -326,8 +344,6 @@ public class GroomServer implements Runn
this.workerServer = RPC.getServer(this, rpcAddr, rpcPort, conf);
this.workerServer.start();
this.rpcServer = rpcAddr + ":" + rpcPort;
-
- LOG.info("Worker rpc server --> " + rpcServer);
}
server = new HttpServer("groomserver", rpcAddr, conf.getInt(
@@ -362,7 +378,7 @@ public class GroomServer implements Runn
this.taskReportAddress = taskReportServer.getListenerAddress();
this.conf.set("bsp.groom.report.address", taskReportAddress.getHostName()
+ ":" + taskReportAddress.getPort());
- LOG.info("GroomServer up at: " + this.taskReportAddress);
+ LOG.info("TaskReportServer up at: " + this.taskReportAddress);
this.groomHostName = rpcAddr;
this.groomServerName = "groomd_" + this.rpcServer.replace(':', '_');
@@ -1198,8 +1214,7 @@ public class GroomServer implements Runn
defaultConf);
final BSPTask task = (BSPTask) umbilical.getTask(taskid);
- int peerPort = Constants.DEFAULT_PEER_PORT;
- peerPort = BSPNetUtils.getNextAvailable(peerPort);
+ int peerPort = umbilical.getAssignedPortNum(taskid);
defaultConf.addResource(new Path(task.getJobFile()));
BSPJob job = new BSPJob(task.getJobID(), task.getJobFile());
@@ -1212,7 +1227,7 @@ public class GroomServer implements Runn
long superstep = Long.parseLong(args[4]);
TaskStatus.State state = TaskStatus.State.valueOf(args[5]);
- LOG.debug("Starting peer for sstep " + superstep + " state = " + state);
+ LOG.debug("Starting peer for step " + superstep + " state = " + state);
try {
// use job-specified working directory
@@ -1334,6 +1349,11 @@ public class GroomServer implements Runn
}
@Override
+ public int getAssignedPortNum(TaskAttemptID taskid) {
+ return assignedPeerNames.get(taskid);
+ }
+
+ @Override
public void process(WatchedEvent event) {
// do nothing
}
Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/LocalBSPRunner.java
URL:
http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/LocalBSPRunner.java?rev=1565212&r1=1565211&r2=1565212&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/LocalBSPRunner.java
(original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/LocalBSPRunner.java Thu
Feb 6 12:49:10 2014
@@ -408,6 +408,12 @@ public class LocalBSPRunner implements J
throws IOException, InterruptedException {
return true;
}
+
+ @Override
+ public int getAssignedPortNum(TaskAttemptID taskid) {
+ // TODO Auto-generated method stub
+ return 0;
+ }
}
public static class LocalSyncClient extends BSPPeerSyncClient {
Modified:
hama/trunk/core/src/main/java/org/apache/hama/bsp/message/HamaMessageManagerImpl.java
URL:
http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/HamaMessageManagerImpl.java?rev=1565212&r1=1565211&r2=1565212&view=diff
==============================================================================
---
hama/trunk/core/src/main/java/org/apache/hama/bsp/message/HamaMessageManagerImpl.java
(original)
+++
hama/trunk/core/src/main/java/org/apache/hama/bsp/message/HamaMessageManagerImpl.java
Thu Feb 6 12:49:10 2014
@@ -18,6 +18,7 @@
package org.apache.hama.bsp.message;
import java.io.IOException;
+import java.net.BindException;
import java.net.InetSocketAddress;
import java.util.Map;
@@ -25,7 +26,6 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.Writable;
-import org.apache.hama.Constants;
import org.apache.hama.bsp.BSPMessageBundle;
import org.apache.hama.bsp.BSPPeer;
import org.apache.hama.bsp.BSPPeerImpl;
@@ -75,20 +75,31 @@ public final class HamaMessageManagerImp
private final void startRPCServer(Configuration conf,
InetSocketAddress peerAddress) {
try {
- String bindAddress = conf.get(Constants.PEER_HOST,
- Constants.DEFAULT_PEER_HOST);
- InetSocketAddress selfAddress = new InetSocketAddress(bindAddress, 0);
-
- // TODO Make number of RPC Server threads configurable
- this.server = RPC.getServer(this, selfAddress.getHostName(),
- selfAddress.getPort(),
conf.getInt("hama.default.messenger.handler.threads.num", 5), false, conf);
+ startServer(peerAddress.getHostName(), peerAddress.getPort());
+ } catch (IOException ioe) {
+ LOG.error("Fail to start RPC server!", ioe);
+ throw new RuntimeException("RPC Server could not be launched!");
+ }
+ }
+
+ private void startServer(String hostName, int port) throws IOException {
+ int retry = 0;
+ try {
+ this.server = RPC.getServer(this, hostName, port,
+ conf.getInt("hama.default.messenger.handler.threads.num", 5), false,
+ conf);
+
server.start();
-
- LOG.info(" BSPPeer address:" + server.getListenerAddress().getHostName()
+ LOG.info("BSPPeer address:" + server.getListenerAddress().getHostName()
+ " port:" + server.getListenerAddress().getPort());
- } catch (IOException e) {
- LOG.error("Fail to start RPC server!", e);
- throw new RuntimeException("RPC Server could not be launched!");
+ } catch (BindException e) {
+ LOG.warn("Address already in use. Retrying " + hostName + ":" + port +
1);
+ startServer(hostName, port + 1);
+ retry++;
+
+ if (retry > 5) {
+ throw new RuntimeException("RPC Server could not be launched!");
+ }
}
}
@@ -111,7 +122,7 @@ public final class HamaMessageManagerImp
if (compressor != null
&& (bundle.getApproximateSize() > conf.getLong(
"hama.messenger.compression.threshold", 1048576))) {
-
+
BSPCompressedBundle compMsgBundle = compressor.compressBundle(bundle);
bspPeerConnection.put(compMsgBundle);
peer.incrementCounter(BSPPeerImpl.PeerCounter.COMPRESSED_MESSAGES, 1L);
Modified: hama/trunk/core/src/main/java/org/apache/hama/ipc/BSPPeerProtocol.java
URL:
http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/ipc/BSPPeerProtocol.java?rev=1565212&r1=1565211&r2=1565212&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/ipc/BSPPeerProtocol.java
(original)
+++ hama/trunk/core/src/main/java/org/apache/hama/ipc/BSPPeerProtocol.java Thu
Feb 6 12:49:10 2014
@@ -67,4 +67,6 @@ public interface BSPPeerProtocol extends
boolean statusUpdate(TaskAttemptID taskId, TaskStatus taskStatus)
throws IOException, InterruptedException;
+ int getAssignedPortNum(TaskAttemptID taskid);
+
}
Modified:
hama/trunk/core/src/test/java/org/apache/hama/bsp/TestBSPTaskFaults.java
URL:
http://svn.apache.org/viewvc/hama/trunk/core/src/test/java/org/apache/hama/bsp/TestBSPTaskFaults.java?rev=1565212&r1=1565211&r2=1565212&view=diff
==============================================================================
--- hama/trunk/core/src/test/java/org/apache/hama/bsp/TestBSPTaskFaults.java
(original)
+++ hama/trunk/core/src/test/java/org/apache/hama/bsp/TestBSPTaskFaults.java
Thu Feb 6 12:49:10 2014
@@ -159,6 +159,12 @@ public class TestBSPTaskFaults extends T
lastPingTime = 0L;
}
}
+
+ @Override
+ public int getAssignedPortNum(TaskAttemptID taskid) {
+ // TODO Auto-generated method stub
+ return 0;
+ }
}
@SuppressWarnings("unused")
Modified:
hama/trunk/yarn/src/main/java/org/apache/hama/bsp/BSPApplicationMaster.java
URL:
http://svn.apache.org/viewvc/hama/trunk/yarn/src/main/java/org/apache/hama/bsp/BSPApplicationMaster.java?rev=1565212&r1=1565211&r2=1565212&view=diff
==============================================================================
--- hama/trunk/yarn/src/main/java/org/apache/hama/bsp/BSPApplicationMaster.java
(original)
+++ hama/trunk/yarn/src/main/java/org/apache/hama/bsp/BSPApplicationMaster.java
Thu Feb 6 12:49:10 2014
@@ -405,4 +405,9 @@ public class BSPApplicationMaster implem
}
+ @Override
+ public int getAssignedPortNum(TaskAttemptID taskid) {
+ return 0;
+ }
+
}