MaoYuan Xian created HAMA-780:
---------------------------------
Summary: New launched child processes by fault tolerance may not
be able to contact each other
Key: HAMA-780
URL: https://issues.apache.org/jira/browse/HAMA-780
Project: Hama
Issue Type: Bug
Components: bsp core
Affects Versions: 0.6.2
Reporter: MaoYuan Xian
When fault tolerance enabled, sometimes recovery process fail because of new
launched child process can not send message to each other.
I can finally find the cause:
On one hand, when a new child process is launched for recovery, its port is set
via following logic:
{code}
final BSPTask task = (BSPTask) umbilical.getTask(taskid);
int peerPort = umbilical.getAssignedPortNum(taskid);
...
defaultConf.setInt(Constants.PEER_PORT, peerPort);
{code}
These logic will find the lowest available port for new comming process:
{code}
public static int getNextAvailable(int fromPort) {
...
for (int i = fromPort + 1; i <= MAX_PORT_NUMBER; i++) {
if (available(i)) {
return i;
}
}
...
}
{code}
List a use case here:
Run one job with 3 child tasks, they are listening to hostname:61001,
hostname:61002, hostname:61003
In case the task listens to hostname:61002 failed (because of disk problem or
kill by system's memory protection program), the 61002 port is release now.
Recovery process start, trigger three new processes, assign to the addresses
hostname:61002, hostname:61004, hostname:61005. (61001, 61003 is still be held
by old child task before they quit).
During this recovery phase, we can find the /bsp/job_id/peers directory in
zookeeper is something like
{quote}
hostname:61001, hostname:61002, hostname:61005, hostname:61003, hostname:61004
{quote}
One the other hand, new launched child processes try to find each other from
zookeeper when they are launch (in BSPPeerImpl.java):
{code}
private final void initPeerNames() {
if (allPeers == null) {
allPeers = syncClient.getAllPeerNames(taskId);
}
}
{code}
{code}
public String[] getAllPeerNames(TaskAttemptID taskId) {
if (allPeers == null) {
TreeMap<Integer, String> sortedMap = new TreeMap<Integer, String>();
try {
List<String> var = zk.getChildren(
constructKey(taskId.getJobID(), "peers"), this);
allPeers = var.toArray(new String[var.size()]);
for (String s : allPeers) {
...
boolean result = getValueFromBytes(data, thatTask);
if (result) {
LOG.debug("TASK mapping from zookeeper: " + thatTask + " ID:"
+ thatTask.getTaskID().getId() + " : " + s);
sortedMap.put(thatTask.getTaskID().getId(), s);
}
}
} catch (Exception e) {
LOG.error(e);
throw new RuntimeException("All peer names could not be retrieved!");
}
...
}
{code}
Open the log, we can find the following:
{quote}
13/07/13 00:03:39 DEBUG sync.ZooKeeperSyncClientImpl: TASK mapping from
zookeeper: attempt_201307122024_0005_000001_0 ID:1 : hostname:61001
13/07/13 00:03:39 DEBUG sync.ZooKeeperSyncClientImpl: TASK mapping from
zookeeper: attempt_201307122024_0005_000000_1 ID:0 : hostname:61002
13/07/13 00:03:39 DEBUG sync.ZooKeeperSyncClientImpl: TASK mapping from
zookeeper: attempt_201307122024_0005_000002_1 ID:2 : hostname:61005
13/07/13 00:03:39 DEBUG sync.ZooKeeperSyncClientImpl: TASK mapping from
zookeeper: attempt_201307122024_0005_000002_0 ID:2 : hostname:61003
13/07/13 00:03:39 DEBUG sync.ZooKeeperSyncClientImpl: TASK mapping from
zookeeper: attempt_201307122024_0005_000001_1 ID:1 : hostname:61004
{quote}
New adding peer hostname:61005 is put before the hostname:61003, which make the
sortedMap in ZooKeeperSyncClientImpl has the map <2, hostname:61003> (the above
code sortedMap.put(thatTask.getTaskID().getId(), s) makes this happen). The new
round of processes communication will run into mal-function because the message
should be sent to "hostname:61005" will be sent to "hostname:61003"
--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira