Repository: giraph Updated Branches: refs/heads/trunk e004359a1 -> c1d50bca9
GIRAPH-850 Project: http://git-wip-us.apache.org/repos/asf/giraph/repo Commit: http://git-wip-us.apache.org/repos/asf/giraph/commit/c1d50bca Tree: http://git-wip-us.apache.org/repos/asf/giraph/tree/c1d50bca Diff: http://git-wip-us.apache.org/repos/asf/giraph/diff/c1d50bca Branch: refs/heads/trunk Commit: c1d50bca96043c8a3e1ed1acfec98f92d03f6864 Parents: e004359 Author: Claudio Martella <[email protected]> Authored: Wed Feb 19 14:07:25 2014 +0100 Committer: Claudio Martella <[email protected]> Committed: Wed Feb 19 14:07:25 2014 +0100 ---------------------------------------------------------------------- .../apache/giraph/conf/GiraphConfiguration.java | 9 ---- .../org/apache/giraph/conf/GiraphConstants.java | 3 -- .../apache/giraph/graph/GraphTaskManager.java | 56 ++++++-------------- .../apache/giraph/yarn/GiraphYarnClient.java | 1 - .../org/apache/giraph/zk/ZooKeeperManager.java | 28 ++++++---- 5 files changed, 33 insertions(+), 64 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/giraph/blob/c1d50bca/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConfiguration.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConfiguration.java b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConfiguration.java index ea1f12d..358fe44 100644 --- a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConfiguration.java +++ b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConfiguration.java @@ -800,15 +800,6 @@ public class GiraphConfiguration extends Configuration return ZOOKEEPER_SERVER_COUNT.get(this); } - /** - * Set the ZooKeeper jar classpath - * - * @param classPath Classpath for the ZooKeeper jar - */ - public void setZooKeeperJar(String classPath) { - set(ZOOKEEPER_JAR, classPath); - } - public int getZooKeeperSessionTimeout() { return ZOOKEEPER_SESSION_TIMEOUT.get(this); } http://git-wip-us.apache.org/repos/asf/giraph/blob/c1d50bca/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java index 732827c..8afe101 100644 --- a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java +++ b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java @@ -446,9 +446,6 @@ public interface GiraphConstants { IntConfOption ZOOKEEPER_SERVER_PORT = new IntConfOption("giraph.zkServerPort", 22181, "ZooKeeper port to use"); - /** Location of the ZooKeeper jar - Used internally, not meant for users */ - String ZOOKEEPER_JAR = "giraph.zkJar"; - /** Local ZooKeeper directory to use */ String ZOOKEEPER_DIR = "giraph.zkDir"; http://git-wip-us.apache.org/repos/asf/giraph/blob/c1d50bca/giraph-core/src/main/java/org/apache/giraph/graph/GraphTaskManager.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/GraphTaskManager.java b/giraph-core/src/main/java/org/apache/giraph/graph/GraphTaskManager.java index 573bd95..7ab291b 100644 --- a/giraph-core/src/main/java/org/apache/giraph/graph/GraphTaskManager.java +++ b/giraph-core/src/main/java/org/apache/giraph/graph/GraphTaskManager.java @@ -211,7 +211,6 @@ public class GraphTaskManager<I extends WritableComparable, V extends Writable, conf.createComputationFactory().initialize(conf); // Do some task setup (possibly starting up a Zookeeper service) context.setStatus("setup: Initializing Zookeeper services."); - locateZookeeperClasspath(zkPathList); String serverPortList = conf.getZookeeperList(); if (serverPortList.isEmpty()) { if (startZooKeeperManager()) { @@ -581,44 +580,6 @@ public class GraphTaskManager<I extends WritableComparable, V extends Writable, } /** - * Attempt to locate the local copies of the ZK jar files, assuming - * the underlying cluster framework has provided them for us. - * @param fileClassPaths the path to the ZK jars on the local cluster. - */ - private void locateZookeeperClasspath(Path[] fileClassPaths) - throws IOException { - String zkClasspath = null; - if (fileClassPaths == null) { - if (LOG.isInfoEnabled()) { - LOG.info("Distributed cache is empty. Assuming fatjar."); - } - String jarFile = context.getJar(); - if (jarFile == null) { - jarFile = findContainingJar(getClass()); - } - // Pure YARN profiles will use unpacked resources, so calls - // to "findContainingJar()" in that context can return NULL! - zkClasspath = null == jarFile ? - "./*" : jarFile.replaceFirst("file:", ""); - } else { - StringBuilder sb = new StringBuilder(); - sb.append(fileClassPaths[0]); - - for (int i = 1; i < fileClassPaths.length; i++) { - sb.append(":"); - sb.append(fileClassPaths[i]); - } - zkClasspath = sb.toString(); - } - - if (LOG.isInfoEnabled()) { - LOG.info("setup: classpath @ " + zkClasspath + " for job " + - context.getJobName()); - } - conf.setZooKeeperJar(zkClasspath); - } - - /** * Initialize the root logger and appender to the settings in conf. */ private void initializeAndConfigureLogging() { @@ -884,14 +845,29 @@ public class GraphTaskManager<I extends WritableComparable, V extends Writable, try { if (masterThread != null) { masterThread.join(); + LOG.info("cleanup: Joined with master thread"); } } catch (InterruptedException e) { // cleanup phase -- just log the error LOG.error("cleanup: Master thread couldn't join"); } if (zkManager != null) { - zkManager.offlineZooKeeperServers(ZooKeeperManager.State.FINISHED); + LOG.info("cleanup: Offlining ZooKeeper servers"); + try { + zkManager.offlineZooKeeperServers(ZooKeeperManager.State.FINISHED); + // We need this here cause apparently exceptions are eaten by Hadoop + // when they come from the cleanup lifecycle and it's useful to know + // if something is wrong. + // + // And since it's cleanup nothing too bad should happen if we don't + // propagate and just allow the job to finish normally. + // CHECKSTYLE: stop IllegalCatch + } catch (Throwable e) { + // CHECKSTYLE: resume IllegalCatch + LOG.error("cleanup: Error offlining zookeeper", e); + } } + // Stop tracking metrics GiraphMetrics.get().shutdown(); } http://git-wip-us.apache.org/repos/asf/giraph/blob/c1d50bca/giraph-core/src/main/java/org/apache/giraph/yarn/GiraphYarnClient.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/yarn/GiraphYarnClient.java b/giraph-core/src/main/java/org/apache/giraph/yarn/GiraphYarnClient.java index 70166b6..9f5924d 100644 --- a/giraph-core/src/main/java/org/apache/giraph/yarn/GiraphYarnClient.java +++ b/giraph-core/src/main/java/org/apache/giraph/yarn/GiraphYarnClient.java @@ -113,7 +113,6 @@ public class GiraphYarnClient { * @return true if job is successful */ public boolean run(final boolean verbose) throws YarnException, IOException { - checkJobLocalZooKeeperSupported(); // init our connection to YARN ResourceManager RPC LOG.info("Running Client"); yarnClient.start(); http://git-wip-us.apache.org/repos/asf/giraph/blob/c1d50bca/giraph-core/src/main/java/org/apache/giraph/zk/ZooKeeperManager.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/zk/ZooKeeperManager.java b/giraph-core/src/main/java/org/apache/giraph/zk/ZooKeeperManager.java index 66f627f..348580c 100644 --- a/giraph-core/src/main/java/org/apache/giraph/zk/ZooKeeperManager.java +++ b/giraph-core/src/main/java/org/apache/giraph/zk/ZooKeeperManager.java @@ -633,14 +633,13 @@ public class ZooKeeperManager { "onlineZooKeeperServers: java.home is not set!"); } commandList.add(javaHome + "/bin/java"); + commandList.add("-cp"); + commandList.add(System.getProperty("java.class.path")); String zkJavaOptsString = GiraphConstants.ZOOKEEPER_JAVA_OPTS.get(conf); String[] zkJavaOptsArray = zkJavaOptsString.split(" "); if (zkJavaOptsArray != null) { commandList.addAll(Arrays.asList(zkJavaOptsArray)); } - commandList.add("-cp"); - Path fullJarPath = new Path(conf.get(GiraphConstants.ZOOKEEPER_JAR)); - commandList.add(fullJarPath.toString()); commandList.add(QuorumPeerMain.class.getName()); commandList.add(configFilePath); processBuilder.command(commandList); @@ -806,18 +805,18 @@ public class ZooKeeperManager { } /** - * Wait for all map tasks to signal completion. Will wait up to + * Wait for all workers to signal completion. Will wait up to * WAIT_TASK_DONE_TIMEOUT_MS milliseconds for this to complete before * reporting an error. * - * @param totalMapTasks Number of map tasks to wait for + * @param totalWorkers Number of workers to wait for */ - private void waitUntilAllTasksDone(int totalMapTasks) { + private void waitUntilAllTasksDone(int totalWorkers) { int attempt = 0; long maxMs = time.getMilliseconds() + conf.getWaitTaskDoneTimeoutMs(); while (true) { - boolean[] taskDoneArray = new boolean[totalMapTasks]; + boolean[] taskDoneArray = new boolean[totalWorkers]; try { FileStatus [] fileStatusArray = fs.listStatus(taskDirectory); @@ -834,12 +833,12 @@ public class ZooKeeperManager { } if (LOG.isInfoEnabled()) { LOG.info("waitUntilAllTasksDone: Got " + totalDone + - " and " + totalMapTasks + + " and " + totalWorkers + " desired (polling period is " + pollMsecs + ") on attempt " + attempt); } - if (totalDone >= totalMapTasks) { + if (totalDone >= totalWorkers) { break; } else { StringBuilder sb = new StringBuilder(); @@ -882,8 +881,15 @@ public class ZooKeeperManager { } synchronized (this) { if (zkProcess != null) { - int totalMapTasks = conf.getMapTasks(); - waitUntilAllTasksDone(totalMapTasks); + boolean isYarnJob = GiraphConstants.IS_PURE_YARN_JOB.get(conf); + int totalWorkers = conf.getMapTasks(); + // A Yarn job always spawns MAX_WORKERS + 1 containers + if (isYarnJob) { + totalWorkers = conf.getInt(GiraphConstants.MAX_WORKERS, 0) + 1; + } + LOG.info("offlineZooKeeperServers: Will wait for " + + totalWorkers + " tasks"); + waitUntilAllTasksDone(totalWorkers); zkProcess.destroy(); int exitValue = -1; File zkDirFile;
