Repository: giraph
Updated Branches:
  refs/heads/trunk e004359a1 -> c1d50bca9


GIRAPH-850


Project: http://git-wip-us.apache.org/repos/asf/giraph/repo
Commit: http://git-wip-us.apache.org/repos/asf/giraph/commit/c1d50bca
Tree: http://git-wip-us.apache.org/repos/asf/giraph/tree/c1d50bca
Diff: http://git-wip-us.apache.org/repos/asf/giraph/diff/c1d50bca

Branch: refs/heads/trunk
Commit: c1d50bca96043c8a3e1ed1acfec98f92d03f6864
Parents: e004359
Author: Claudio Martella <[email protected]>
Authored: Wed Feb 19 14:07:25 2014 +0100
Committer: Claudio Martella <[email protected]>
Committed: Wed Feb 19 14:07:25 2014 +0100

----------------------------------------------------------------------
 .../apache/giraph/conf/GiraphConfiguration.java |  9 ----
 .../org/apache/giraph/conf/GiraphConstants.java |  3 --
 .../apache/giraph/graph/GraphTaskManager.java   | 56 ++++++--------------
 .../apache/giraph/yarn/GiraphYarnClient.java    |  1 -
 .../org/apache/giraph/zk/ZooKeeperManager.java  | 28 ++++++----
 5 files changed, 33 insertions(+), 64 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/giraph/blob/c1d50bca/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConfiguration.java
----------------------------------------------------------------------
diff --git 
a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConfiguration.java 
b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConfiguration.java
index ea1f12d..358fe44 100644
--- a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConfiguration.java
+++ b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConfiguration.java
@@ -800,15 +800,6 @@ public class GiraphConfiguration extends Configuration
     return ZOOKEEPER_SERVER_COUNT.get(this);
   }
 
-  /**
-   * Set the ZooKeeper jar classpath
-   *
-   * @param classPath Classpath for the ZooKeeper jar
-   */
-  public void setZooKeeperJar(String classPath) {
-    set(ZOOKEEPER_JAR, classPath);
-  }
-
   public int getZooKeeperSessionTimeout() {
     return ZOOKEEPER_SESSION_TIMEOUT.get(this);
   }

http://git-wip-us.apache.org/repos/asf/giraph/blob/c1d50bca/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java
----------------------------------------------------------------------
diff --git 
a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java 
b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java
index 732827c..8afe101 100644
--- a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java
+++ b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java
@@ -446,9 +446,6 @@ public interface GiraphConstants {
   IntConfOption ZOOKEEPER_SERVER_PORT =
       new IntConfOption("giraph.zkServerPort", 22181, "ZooKeeper port to use");
 
-  /** Location of the ZooKeeper jar - Used internally, not meant for users */
-  String ZOOKEEPER_JAR = "giraph.zkJar";
-
   /** Local ZooKeeper directory to use */
   String ZOOKEEPER_DIR = "giraph.zkDir";
 

http://git-wip-us.apache.org/repos/asf/giraph/blob/c1d50bca/giraph-core/src/main/java/org/apache/giraph/graph/GraphTaskManager.java
----------------------------------------------------------------------
diff --git 
a/giraph-core/src/main/java/org/apache/giraph/graph/GraphTaskManager.java 
b/giraph-core/src/main/java/org/apache/giraph/graph/GraphTaskManager.java
index 573bd95..7ab291b 100644
--- a/giraph-core/src/main/java/org/apache/giraph/graph/GraphTaskManager.java
+++ b/giraph-core/src/main/java/org/apache/giraph/graph/GraphTaskManager.java
@@ -211,7 +211,6 @@ public class GraphTaskManager<I extends WritableComparable, 
V extends Writable,
     conf.createComputationFactory().initialize(conf);
     // Do some task setup (possibly starting up a Zookeeper service)
     context.setStatus("setup: Initializing Zookeeper services.");
-    locateZookeeperClasspath(zkPathList);
     String serverPortList = conf.getZookeeperList();
     if (serverPortList.isEmpty()) {
       if (startZooKeeperManager()) {
@@ -581,44 +580,6 @@ public class GraphTaskManager<I extends 
WritableComparable, V extends Writable,
   }
 
   /**
-   * Attempt to locate the local copies of the ZK jar files, assuming
-   * the underlying cluster framework has provided them for us.
-   * @param fileClassPaths the path to the ZK jars on the local cluster.
-   */
-  private void locateZookeeperClasspath(Path[] fileClassPaths)
-    throws IOException {
-    String zkClasspath = null;
-    if (fileClassPaths == null) {
-      if (LOG.isInfoEnabled()) {
-        LOG.info("Distributed cache is empty. Assuming fatjar.");
-      }
-      String jarFile = context.getJar();
-      if (jarFile == null) {
-        jarFile = findContainingJar(getClass());
-      }
-      // Pure YARN profiles will use unpacked resources, so calls
-      // to "findContainingJar()" in that context can return NULL!
-      zkClasspath = null == jarFile ?
-          "./*" : jarFile.replaceFirst("file:", "");
-    } else {
-      StringBuilder sb = new StringBuilder();
-      sb.append(fileClassPaths[0]);
-
-      for (int i = 1; i < fileClassPaths.length; i++) {
-        sb.append(":");
-        sb.append(fileClassPaths[i]);
-      }
-      zkClasspath = sb.toString();
-    }
-
-    if (LOG.isInfoEnabled()) {
-      LOG.info("setup: classpath @ " + zkClasspath + " for job " +
-          context.getJobName());
-    }
-    conf.setZooKeeperJar(zkClasspath);
-  }
-
-  /**
    * Initialize the root logger and appender to the settings in conf.
    */
   private void initializeAndConfigureLogging() {
@@ -884,14 +845,29 @@ public class GraphTaskManager<I extends 
WritableComparable, V extends Writable,
     try {
       if (masterThread != null) {
         masterThread.join();
+        LOG.info("cleanup: Joined with master thread");
       }
     } catch (InterruptedException e) {
       // cleanup phase -- just log the error
       LOG.error("cleanup: Master thread couldn't join");
     }
     if (zkManager != null) {
-      zkManager.offlineZooKeeperServers(ZooKeeperManager.State.FINISHED);
+      LOG.info("cleanup: Offlining ZooKeeper servers");
+      try {
+        zkManager.offlineZooKeeperServers(ZooKeeperManager.State.FINISHED);
+      // We need this here cause apparently exceptions are eaten by Hadoop
+      // when they come from the cleanup lifecycle and it's useful to know
+      // if something is wrong.
+      //
+      // And since it's cleanup nothing too bad should happen if we don't
+      // propagate and just allow the job to finish normally.
+      // CHECKSTYLE: stop IllegalCatch
+      } catch (Throwable e) {
+      // CHECKSTYLE: resume IllegalCatch
+        LOG.error("cleanup: Error offlining zookeeper", e);
+      }
     }
+
     // Stop tracking metrics
     GiraphMetrics.get().shutdown();
   }

http://git-wip-us.apache.org/repos/asf/giraph/blob/c1d50bca/giraph-core/src/main/java/org/apache/giraph/yarn/GiraphYarnClient.java
----------------------------------------------------------------------
diff --git 
a/giraph-core/src/main/java/org/apache/giraph/yarn/GiraphYarnClient.java 
b/giraph-core/src/main/java/org/apache/giraph/yarn/GiraphYarnClient.java
index 70166b6..9f5924d 100644
--- a/giraph-core/src/main/java/org/apache/giraph/yarn/GiraphYarnClient.java
+++ b/giraph-core/src/main/java/org/apache/giraph/yarn/GiraphYarnClient.java
@@ -113,7 +113,6 @@ public class GiraphYarnClient {
    * @return true if job is successful
    */
   public boolean run(final boolean verbose) throws YarnException, IOException {
-    checkJobLocalZooKeeperSupported();
     // init our connection to YARN ResourceManager RPC
     LOG.info("Running Client");
     yarnClient.start();

http://git-wip-us.apache.org/repos/asf/giraph/blob/c1d50bca/giraph-core/src/main/java/org/apache/giraph/zk/ZooKeeperManager.java
----------------------------------------------------------------------
diff --git 
a/giraph-core/src/main/java/org/apache/giraph/zk/ZooKeeperManager.java 
b/giraph-core/src/main/java/org/apache/giraph/zk/ZooKeeperManager.java
index 66f627f..348580c 100644
--- a/giraph-core/src/main/java/org/apache/giraph/zk/ZooKeeperManager.java
+++ b/giraph-core/src/main/java/org/apache/giraph/zk/ZooKeeperManager.java
@@ -633,14 +633,13 @@ public class ZooKeeperManager {
             "onlineZooKeeperServers: java.home is not set!");
       }
       commandList.add(javaHome + "/bin/java");
+      commandList.add("-cp");
+      commandList.add(System.getProperty("java.class.path"));
       String zkJavaOptsString = GiraphConstants.ZOOKEEPER_JAVA_OPTS.get(conf);
       String[] zkJavaOptsArray = zkJavaOptsString.split(" ");
       if (zkJavaOptsArray != null) {
         commandList.addAll(Arrays.asList(zkJavaOptsArray));
       }
-      commandList.add("-cp");
-      Path fullJarPath = new Path(conf.get(GiraphConstants.ZOOKEEPER_JAR));
-      commandList.add(fullJarPath.toString());
       commandList.add(QuorumPeerMain.class.getName());
       commandList.add(configFilePath);
       processBuilder.command(commandList);
@@ -806,18 +805,18 @@ public class ZooKeeperManager {
   }
 
   /**
-   * Wait for all map tasks to signal completion.  Will wait up to
+   * Wait for all workers to signal completion.  Will wait up to
    * WAIT_TASK_DONE_TIMEOUT_MS milliseconds for this to complete before
    * reporting an error.
    *
-   * @param totalMapTasks Number of map tasks to wait for
+   * @param totalWorkers Number of workers to wait for
    */
-  private void waitUntilAllTasksDone(int totalMapTasks) {
+  private void waitUntilAllTasksDone(int totalWorkers) {
     int attempt = 0;
     long maxMs = time.getMilliseconds() +
         conf.getWaitTaskDoneTimeoutMs();
     while (true) {
-      boolean[] taskDoneArray = new boolean[totalMapTasks];
+      boolean[] taskDoneArray = new boolean[totalWorkers];
       try {
         FileStatus [] fileStatusArray =
             fs.listStatus(taskDirectory);
@@ -834,12 +833,12 @@ public class ZooKeeperManager {
         }
         if (LOG.isInfoEnabled()) {
           LOG.info("waitUntilAllTasksDone: Got " + totalDone +
-              " and " + totalMapTasks +
+              " and " + totalWorkers +
               " desired (polling period is " +
               pollMsecs + ") on attempt " +
               attempt);
         }
-        if (totalDone >= totalMapTasks) {
+        if (totalDone >= totalWorkers) {
           break;
         } else {
           StringBuilder sb = new StringBuilder();
@@ -882,8 +881,15 @@ public class ZooKeeperManager {
     }
     synchronized (this) {
       if (zkProcess != null) {
-        int totalMapTasks = conf.getMapTasks();
-        waitUntilAllTasksDone(totalMapTasks);
+        boolean isYarnJob = GiraphConstants.IS_PURE_YARN_JOB.get(conf);
+        int totalWorkers = conf.getMapTasks();
+        // A Yarn job always spawns MAX_WORKERS + 1 containers
+        if (isYarnJob) {
+          totalWorkers = conf.getInt(GiraphConstants.MAX_WORKERS, 0) + 1;
+        }
+        LOG.info("offlineZooKeeperServers: Will wait for " +
+            totalWorkers + " tasks");
+        waitUntilAllTasksDone(totalWorkers);
         zkProcess.destroy();
         int exitValue = -1;
         File zkDirFile;

Reply via email to