Author: edwardyoon
Date: Thu Sep 22 01:43:56 2011
New Revision: 1173927
URL: http://svn.apache.org/viewvc?rev=1173927&view=rev
Log:
Fixed barrier problem.
Modified:
incubator/hama/trunk/CHANGES.txt
incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPMaster.java
incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeer.java
Modified: incubator/hama/trunk/CHANGES.txt
URL:
http://svn.apache.org/viewvc/incubator/hama/trunk/CHANGES.txt?rev=1173927&r1=1173926&r2=1173927&view=diff
==============================================================================
--- incubator/hama/trunk/CHANGES.txt (original)
+++ incubator/hama/trunk/CHANGES.txt Thu Sep 22 01:43:56 2011
@@ -9,6 +9,7 @@ Release 0.4 - Unreleased
BUG FIXES
+ HAMA-387: Fixed barrier synchronization problem (ChiaHung Lin via
edwardyoon)
HAMA-436: Web Interface does not update Superstep Count (Thomas Jungblut)
HAMA-429: Groom statuses should be reported periodically (ChiaHung Lin via
edwardyoon)
HAMA-421: Maven build issues using proxy (Joe Crobak via edwardyoon)
Modified:
incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPMaster.java
URL:
http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPMaster.java?rev=1173927&r1=1173926&r2=1173927&view=diff
==============================================================================
--- incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPMaster.java
(original)
+++ incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPMaster.java
Thu Sep 22 01:43:56 2011
@@ -474,6 +474,9 @@ public class BSPMaster implements JobSub
public void clearZKNodes() {
try {
for (String node : zk.getChildren(bspRoot, this)) {
+ for (String subnode : zk.getChildren(bspRoot + "/" + node, this)) {
+ zk.delete(bspRoot + "/" + node + "/" + subnode, 0);
+ }
zk.delete(bspRoot + "/" + node, 0);
}
} catch (KeeperException e) {
Modified:
incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeer.java
URL:
http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeer.java?rev=1173927&r1=1173926&r2=1173927&view=diff
==============================================================================
--- incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeer.java
(original)
+++ incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeer.java
Thu Sep 22 01:43:56 2011
@@ -26,6 +26,7 @@ import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.net.UnknownHostException;
+import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
@@ -49,6 +50,7 @@ import org.apache.hama.ipc.BSPPeerProtoc
import org.apache.hama.util.Bytes;
import org.apache.hama.zookeeper.QuorumPeer;
import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.data.Stat;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
@@ -59,7 +61,7 @@ import org.apache.zookeeper.ZooDefs.Ids;
* This class represents a BSP peer.
*/
public class BSPPeer implements Watcher, BSPPeerInterface {
-
+
public static final Log LOG = LogFactory.getLog(BSPPeer.class);
private final Configuration conf;
@@ -304,7 +306,6 @@ public class BSPPeer implements Watcher,
@Override
public void sync() throws IOException, KeeperException, InterruptedException
{
enterBarrier();
- long startTime = System.currentTimeMillis();
Iterator<Entry<InetSocketAddress, ConcurrentLinkedQueue<BSPMessage>>> it =
this.outgoingQueues
.entrySet().iterator();
@@ -331,15 +332,10 @@ public class BSPPeer implements Watcher,
peer.put(bundle);
}
- if ((System.currentTimeMillis() - startTime) < 200) {
- Thread.sleep(200);
- }
-
leaveBarrier();
currentTaskStatus.incrementSuperstepCount();
umbilical.incrementSuperstepCount(taskid);
- startTime = System.currentTimeMillis();
// Clear outgoing queues.
clearOutgoingQueues();
@@ -351,53 +347,124 @@ public class BSPPeer implements Watcher,
// Switch local queues.
localQueue = localQueueForNextIteration;
localQueueForNextIteration = new ConcurrentLinkedQueue<BSPMessage>();
+ }
+
+ private void createZnode(final String path) throws KeeperException,
+ InterruptedException {
+ createZnode(path, CreateMode.PERSISTENT);
+ }
+
+ private void createEphemeralZnode(final String path) throws KeeperException,
+ InterruptedException {
+ createZnode(path, CreateMode.EPHEMERAL);
+ }
- // TODO: This is a quite temporary solution of HAMA-387.
- // If zk.getChildren() response is slower than 200 milliseconds,
- // BSP system will be hanged.
-
- // We have to consider another way to avoid this problem.
- if ((System.currentTimeMillis() - startTime) < 200) {
- Thread.sleep(200); // at least wait
+ private void createZnode(final String path, final CreateMode mode) throws
KeeperException,
+ InterruptedException {
+ Stat s = zk.exists(path, false);
+ if(null == s) {
+ try {
+ zk.create(path, null, Ids.OPEN_ACL_UNSAFE, mode);
+ } catch(KeeperException.NodeExistsException nee) {
+ LOG.warn("Ignore because znode may be already created at "+path, nee);
+ }
}
}
protected boolean enterBarrier() throws KeeperException,
InterruptedException {
- LOG.debug("[" + getPeerName() + "] enter the enterbarrier: "
- + this.getSuperstepCount());
- zk.create(getNodeName(), Bytes.toBytes(this.getSuperstepCount()),
- Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("[" + getPeerName() + "] enter the enterbarrier: " +
+ this.getSuperstepCount());
+ }
- while (true) {
- synchronized (mutex) {
- List<String> list = zk.getChildren(bspRoot, true);
+ createZnode(bspRoot);
- if (list.size() < jobConf.getNumBspTask()) {
- mutex.wait();
- } else {
- return true;
+ final String pathToJobIdZnode =
+ bspRoot + "/" + taskid.getJobID().toString();
+ createZnode(pathToJobIdZnode);
+
+ final String pathToSuperstepZnode =
+ pathToJobIdZnode + "/" + getSuperstepCount();
+ createZnode(pathToSuperstepZnode);
+
+ zk.exists(pathToSuperstepZnode+"/ready", new Watcher() {
+ @Override
+ public void process(WatchedEvent event) {
+ synchronized(mutex) {
+ try {
+ Stat s = zk.exists(pathToSuperstepZnode+"/ready", false);
+ if(null != s) {
+ zk.delete(pathToSuperstepZnode+"/ready", 0);
+ }
+ } catch(KeeperException.NoNodeException nne) {
+ LOG.warn("Ignore because znode may be deleted.", nne);
+ } catch(Exception e) {
+ throw new RuntimeException(e);
+ }
+ mutex.notifyAll();
}
}
+ });
+ zk.create(getNodeName(), null, Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
+
+ synchronized(mutex) {
+ List<String> znodes = zk.getChildren(pathToSuperstepZnode, false);
+ if(LOG.isDebugEnabled())
+ LOG.debug("enterBarrier() znode size within "+pathToSuperstepZnode+"
is "+
+ znodes.size()+". Znodes include " +znodes);
+ if (znodes.size() < jobConf.getNumBspTask()) {
+ mutex.wait();
+ } else {
+ createEphemeralZnode(pathToSuperstepZnode+"/ready");
+ }
}
+ return true;
}
protected boolean leaveBarrier() throws KeeperException,
InterruptedException {
- zk.delete(getNodeName(), 0);
- while (true) {
+ final String pathToSuperstepZnode =
+ bspRoot + "/" + taskid.getJobID().toString() + "/" + getSuperstepCount();
+ while(true) {
synchronized (mutex) {
- List<String> list = zk.getChildren(bspRoot, true);
-
- if (list.size() > 0) {
- mutex.wait();
- } else {
+ final List<String> znodes = zk.getChildren(pathToSuperstepZnode,
false);
+ final int size = znodes.size();
+ if(null == znodes || znodes.isEmpty()) return true;
+ if(1 == size) {
+ zk.delete(getNodeName(), 0);
return true;
}
+ Collections.sort(znodes);
+ final String lowest = znodes.get(0);
+ final String highest = znodes.get(size-1);
+ if (getNodeName().equals(pathToSuperstepZnode+"/"+lowest)) {
+ Stat s = zk.exists(pathToSuperstepZnode+"/"+highest, new Watcher() {
+ @Override
+ public void process(WatchedEvent event) {
+ synchronized(mutex) {
+ mutex.notifyAll();
+ }
+ }
+ });
+ if(null != s) mutex.wait();
+ }else{
+ Stat s1 = zk.exists(getNodeName(), false);
+ if(null != s1) zk.delete(getNodeName(), 0);
+ Stat s2 = zk.exists(pathToSuperstepZnode+"/"+lowest, new Watcher() {
+ @Override
+ public void process(WatchedEvent event) {
+ synchronized(mutex) {
+ mutex.notifyAll();
+ }
+ }
+ });
+ if(null != s2) mutex.wait();
+ }
}
}
}
private String getNodeName() {
- return bspRoot + "/" + taskid.getJobID().toString() + "_" + getPeerName();
+ return bspRoot + "/" + taskid.getJobID().toString() + "/" +
getSuperstepCount() + "/" + taskid.toString() ;
}
@Override