[ 
https://issues.apache.org/jira/browse/HAMA-431?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13125860#comment-13125860
 ] 

Thomas Jungblut commented on HAMA-431:
--------------------------------------

You're right. 
Note that we receive fault tolerance in YARN sync because it is part of the app 
master, it can simply be restarted. 
And easy to use is a joke isn't it?

This:
{noformat}
protected boolean enterBarrier() throws KeeperException, InterruptedException {
    if (LOG.isDebugEnabled()) {
      LOG.debug("[" + getPeerName() + "] enter the enterbarrier: "
          + this.getSuperstepCount());
    }

    synchronized (zk) {
      createZnode(bspRoot);
      final String pathToJobIdZnode = bspRoot + "/"
          + taskid.getJobID().toString();
      createZnode(pathToJobIdZnode);
      final String pathToSuperstepZnode = pathToJobIdZnode + "/"
          + getSuperstepCount();
      createZnode(pathToSuperstepZnode);
      BarrierWatcher barrierWatcher = new BarrierWatcher();
      Stat readyStat = zk.exists(pathToSuperstepZnode + "/ready",
          barrierWatcher);
      zk.create(getNodeName(), null, Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);

      List<String> znodes = zk.getChildren(pathToSuperstepZnode, false);
      int size = znodes.size(); // may contains ready
      boolean hasReady = znodes.contains("ready");
      if (hasReady) {
        size--;
      }

      LOG.debug("===> at superstep :" + getSuperstepCount()
          + " current znode size: " + znodes.size() + " current znodes:"
          + znodes);

      if (LOG.isDebugEnabled())
        LOG.debug("enterBarrier() znode size within " + pathToSuperstepZnode
            + " is " + znodes.size() + ". Znodes include " + znodes);

      if (size < jobConf.getNumBspTask()) {
        LOG.info("xxxx 1. At superstep: " + getSuperstepCount()
            + " which task is waiting? " + taskid.toString()
            + " stat is null? " + readyStat);
        while (!barrierWatcher.isComplete()) {
          if (!hasReady) {
            synchronized (mutex) {
              mutex.wait(1000);
            }
          }
        }
        LOG.debug("xxxx 2. at superstep: " + getSuperstepCount()
            + " after waiting ..." + taskid.toString());
      } else {
        LOG.debug("---> at superstep: " + getSuperstepCount()
            + " task that is creating /ready znode:" + taskid.toString());
        createEphemeralZnode(pathToSuperstepZnode + "/ready");
      }
    }
    return true;
  }
{noformat}

is just a total not-easy to use way to use zookeeper at all. And it is not 
working correctly without throwing exections the whole time.
Even if you take the log aside it is just a concurrency nightmare.

{noformat}
And again, whatever we chose, it should be designed as a common module.
{noformat}

I suggest to make the BSPPeer (or BSPPeerImpl what ever it is called now) an 
abstract class and subclass a ZooKeeper sync peer and a RPC Sync peer. Let the 
user decide. 
I think this is just a discussion between >I< don't like ZooKeeper and all 
other projects use it. It is not something which will lead us towards a 
solution anyways.
                
> MapReduce NG integration
> ------------------------
>
>                 Key: HAMA-431
>                 URL: https://issues.apache.org/jira/browse/HAMA-431
>             Project: Hama
>          Issue Type: New Feature
>            Reporter: Thomas Jungblut
>            Assignee: Thomas Jungblut
>         Attachments: WelcomeOnYarn.png, job_state.dot, task_phase.dot, 
> task_state.dot
>
>
> We should take a look at how to integrate Hama's BSP Engine to Hadoop's 
> nextGen application platform.
> Can be currently found in the 0.23 branch.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators: 
https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa
For more information on JIRA, see: http://www.atlassian.com/software/jira

        

Reply via email to