Repository: giraph
Updated Branches:
  refs/heads/trunk 78931c03f -> dc4d9a2a7


GIRAPH-975 In-proc ZooKeeper server with Master process

https://reviews.facebook.net/D30693


Project: http://git-wip-us.apache.org/repos/asf/giraph/repo
Commit: http://git-wip-us.apache.org/repos/asf/giraph/commit/dc4d9a2a
Tree: http://git-wip-us.apache.org/repos/asf/giraph/tree/dc4d9a2a
Diff: http://git-wip-us.apache.org/repos/asf/giraph/diff/dc4d9a2a

Branch: refs/heads/trunk
Commit: dc4d9a2a7f5d2e40fc1e28c3b8da011306d7dccc
Parents: 78931c0
Author: Sergey Edunov <[email protected]>
Authored: Tue Dec 23 16:04:11 2014 -0800
Committer: Sergey Edunov <[email protected]>
Committed: Mon Jan 5 18:37:45 2015 -0800

----------------------------------------------------------------------
 CHANGELOG                                       |   2 +
 .../org/apache/giraph/conf/GiraphConstants.java |   9 +
 .../apache/giraph/graph/GraphTaskManager.java   |   2 +-
 .../giraph/zk/InProcessZooKeeperRunner.java     | 168 ++++++++++++++
 .../giraph/zk/OutOfProcessZooKeeperRunner.java  | 227 +++++++++++++++++++
 .../org/apache/giraph/zk/ZooKeeperManager.java  | 196 +++-------------
 .../org/apache/giraph/zk/ZooKeeperRunner.java   |  46 ++++
 7 files changed, 486 insertions(+), 164 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/giraph/blob/dc4d9a2a/CHANGELOG
----------------------------------------------------------------------
diff --git a/CHANGELOG b/CHANGELOG
index fbefa2f..08fe806 100644
--- a/CHANGELOG
+++ b/CHANGELOG
@@ -1,6 +1,8 @@
 Giraph Change Log
 
 Release 1.2.0 - unreleased
+  GIRAPH-975: In-proc ZooKeeper server with Master process (edunov)
+
   GIRAPH-977: useMessageSizeEncoding is broken (majakabiljo)
 
   GIRAPH-976: More command line logging (majakabiljo)

http://git-wip-us.apache.org/repos/asf/giraph/blob/dc4d9a2a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java
----------------------------------------------------------------------
diff --git 
a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java 
b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java
index e78eb42..bbf3bd2 100644
--- a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java
+++ b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java
@@ -554,6 +554,15 @@ public interface GiraphConstants {
           "Msecs to wait before retrying a failed ZooKeeper op due to " +
           "connection loss.");
 
+  /**
+   * Should start zookeeper inside master java process or separately?
+   * In process by default.
+   */
+  BooleanConfOption ZOOKEEEPER_RUNS_IN_PROCESS = new BooleanConfOption(
+      "giraph.zkRunsInProcess",
+      true, "If true run zookeeper in master process, if false starts " +
+      "separate process for zookeeper");
+
   /** TCP backlog (defaults to number of workers) */
   IntConfOption TCP_BACKLOG = new IntConfOption("giraph.tcpBacklog", 1,
       "TCP backlog (defaults to number of workers)");

http://git-wip-us.apache.org/repos/asf/giraph/blob/dc4d9a2a/giraph-core/src/main/java/org/apache/giraph/graph/GraphTaskManager.java
----------------------------------------------------------------------
diff --git 
a/giraph-core/src/main/java/org/apache/giraph/graph/GraphTaskManager.java 
b/giraph-core/src/main/java/org/apache/giraph/graph/GraphTaskManager.java
index 7d19014..d479d74 100644
--- a/giraph-core/src/main/java/org/apache/giraph/graph/GraphTaskManager.java
+++ b/giraph-core/src/main/java/org/apache/giraph/graph/GraphTaskManager.java
@@ -956,7 +956,7 @@ public class GraphTaskManager<I extends WritableComparable, 
V extends Writable,
     if (graphFunctions.isZooKeeper()) {
       // ZooKeeper may have had an issue
       if (zkManager != null) {
-        zkManager.logZooKeeperOutput(Level.WARN);
+        zkManager.cleanup();
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/giraph/blob/dc4d9a2a/giraph-core/src/main/java/org/apache/giraph/zk/InProcessZooKeeperRunner.java
----------------------------------------------------------------------
diff --git 
a/giraph-core/src/main/java/org/apache/giraph/zk/InProcessZooKeeperRunner.java 
b/giraph-core/src/main/java/org/apache/giraph/zk/InProcessZooKeeperRunner.java
new file mode 100644
index 0000000..5556216
--- /dev/null
+++ 
b/giraph-core/src/main/java/org/apache/giraph/zk/InProcessZooKeeperRunner.java
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.zk;
+
+import org.apache.giraph.conf.DefaultImmutableClassesGiraphConfigurable;
+import org.apache.log4j.Logger;
+import org.apache.zookeeper.jmx.ManagedUtil;
+import org.apache.zookeeper.server.DatadirCleanupManager;
+import org.apache.zookeeper.server.ServerConfig;
+import org.apache.zookeeper.server.ZooKeeperServerMain;
+import org.apache.zookeeper.server.quorum.QuorumPeerConfig;
+import org.apache.zookeeper.server.quorum.QuorumPeerMain;
+
+import javax.management.JMException;
+import java.io.IOException;
+
+/**
+ * Zookeeper wrapper that starts zookeeper withing master process.
+ */
+public class InProcessZooKeeperRunner
+    extends DefaultImmutableClassesGiraphConfigurable
+    implements ZooKeeperRunner {
+
+  /** Class logger */
+  private static final Logger LOG =
+      Logger.getLogger(InProcessZooKeeperRunner.class);
+  /**
+   * Wrapper for zookeeper quorum.
+   */
+  private QuorumRunner quorumRunner = new QuorumRunner();
+
+  @Override
+  public void start(String zkDir, final String configFilePath) {
+    Thread zkThread = new Thread(new Runnable() {
+      @Override
+      public void run() {
+        try {
+          quorumRunner.start(configFilePath);
+        } catch (IOException e) {
+          LOG.error("Unable to start zookeeper", e);
+        } catch (QuorumPeerConfig.ConfigException e) {
+          LOG.error("Invalid config, zookeeper failed", e);
+        }
+      }
+    });
+    zkThread.setDaemon(true);
+    zkThread.start();
+  }
+
+  @Override
+  public void stop() {
+    try {
+      quorumRunner.stop();
+    } catch (InterruptedException e) {
+      LOG.error("Unable to cleanly shutdown zookeeper", e);
+    }
+  }
+
+  @Override
+  public void cleanup() {
+  }
+
+  /**
+   * Wrapper around zookeeper quorum. Does not necessarily
+   * starts quorum, if there is only one server in config file
+   * will only start zookeeper.
+   */
+  private static class QuorumRunner extends QuorumPeerMain {
+
+    /**
+     * ZooKeeper server wrapper.
+     */
+    private ZooKeeperServerRunner serverRunner;
+
+    /**
+     * Starts quorum and/or zookeeper service.
+     * @param configFilePath quorum and zookeeper configuration
+     * @throws IOException
+     * @throws QuorumPeerConfig.ConfigException if config
+     * is not formatted properly
+     */
+    public void start(String configFilePath) throws IOException,
+        QuorumPeerConfig.ConfigException {
+      QuorumPeerConfig quorumPeerConfig = new QuorumPeerConfig();
+      quorumPeerConfig.parse(configFilePath);
+      // Start and schedule the the purge task
+      DatadirCleanupManager purgeMgr = new DatadirCleanupManager(
+          quorumPeerConfig
+          .getDataDir(), quorumPeerConfig.getDataLogDir(), quorumPeerConfig
+          .getSnapRetainCount(), quorumPeerConfig.getPurgeInterval());
+      purgeMgr.start();
+
+      if (quorumPeerConfig.getServers().size() > 0) {
+        runFromConfig(quorumPeerConfig);
+      } else {
+        serverRunner = new ZooKeeperServerRunner();
+        serverRunner.start(configFilePath);
+      }
+
+      LOG.info("Initialization ended");
+    }
+
+    /**
+     * Stop quorum and/or zookeeper.
+     * @throws InterruptedException
+     */
+    public void stop() throws InterruptedException {
+      if (quorumPeer != null) {
+        quorumPeer.shutdown();
+        quorumPeer.join();
+      } else if (serverRunner != null) {
+        serverRunner.stop();
+      } else {
+        LOG.warn("Neither quorum nor server is set");
+      }
+    }
+  }
+
+  /**
+   * Wrapper around zookeeper service.
+   */
+  private static class ZooKeeperServerRunner extends ZooKeeperServerMain {
+
+    /**
+     * Start zookeeper service.
+     * @param configFilePath zookeeper configuration file
+     * @throws QuorumPeerConfig.ConfigException if config file is not
+     * formatted properly
+     * @throws IOException
+     */
+    public void start(String configFilePath) throws
+        QuorumPeerConfig.ConfigException, IOException {
+      LOG.warn("Either no config or no quorum defined in config, running " +
+          " in standalone mode");
+      try {
+        ManagedUtil.registerLog4jMBeans();
+      } catch (JMException e) {
+        LOG.warn("Unable to register log4j JMX control", e);
+      }
+
+      ServerConfig serverConfig = new ServerConfig();
+      serverConfig.parse(configFilePath);
+      runFromConfig(serverConfig);
+    }
+
+    /**
+     * Stop zookeeper service.
+     */
+    public void stop() {
+      shutdown();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/dc4d9a2a/giraph-core/src/main/java/org/apache/giraph/zk/OutOfProcessZooKeeperRunner.java
----------------------------------------------------------------------
diff --git 
a/giraph-core/src/main/java/org/apache/giraph/zk/OutOfProcessZooKeeperRunner.java
 
b/giraph-core/src/main/java/org/apache/giraph/zk/OutOfProcessZooKeeperRunner.java
new file mode 100644
index 0000000..c86a199
--- /dev/null
+++ 
b/giraph-core/src/main/java/org/apache/giraph/zk/OutOfProcessZooKeeperRunner.java
@@ -0,0 +1,227 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.zk;
+
+import com.google.common.collect.Lists;
+import org.apache.giraph.conf.DefaultImmutableClassesGiraphConfigurable;
+import org.apache.giraph.conf.GiraphConstants;
+import org.apache.log4j.Level;
+import org.apache.log4j.Logger;
+import org.apache.zookeeper.server.quorum.QuorumPeerMain;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.nio.charset.Charset;
+import java.util.Arrays;
+import java.util.LinkedList;
+import java.util.List;
+
+/**
+ * Zookeeper wrapper that starts zookeeper in the separate process (old way).
+ */
+public class OutOfProcessZooKeeperRunner
+    extends DefaultImmutableClassesGiraphConfigurable
+    implements ZooKeeperRunner {
+
+  /** Class logger */
+  private static final Logger LOG =
+      Logger.getLogger(OutOfProcessZooKeeperRunner.class);
+
+  /** ZooKeeper process */
+  private Process zkProcess;
+  /** Thread that gets the zkProcess output */
+  private StreamCollector zkProcessCollector = null;
+  /** Synchronization lock for zkProcess */
+  private final Object processLock = new Object();
+
+  @Override
+  public void start(String zkDir, String configFilePath) {
+    try {
+      ProcessBuilder processBuilder = new ProcessBuilder();
+      List<String> commandList = Lists.newArrayList();
+      String javaHome = System.getProperty("java.home");
+      if (javaHome == null) {
+        throw new IllegalArgumentException(
+            "onlineZooKeeperServers: java.home is not set!");
+      }
+      commandList.add(javaHome + "/bin/java");
+      commandList.add("-cp");
+      commandList.add(System.getProperty("java.class.path"));
+      String zkJavaOptsString =
+          GiraphConstants.ZOOKEEPER_JAVA_OPTS.get(getConf());
+      String[] zkJavaOptsArray = zkJavaOptsString.split(" ");
+      if (zkJavaOptsArray != null) {
+        commandList.addAll(Arrays.asList(zkJavaOptsArray));
+      }
+      commandList.add(QuorumPeerMain.class.getName());
+      commandList.add(configFilePath);
+      processBuilder.command(commandList);
+      File execDirectory = new File(zkDir);
+      processBuilder.directory(execDirectory);
+      processBuilder.redirectErrorStream(true);
+      if (LOG.isInfoEnabled()) {
+        LOG.info("onlineZooKeeperServers: Attempting to " +
+            "start ZooKeeper server with command " + commandList +
+            " in directory " + execDirectory.toString());
+      }
+      synchronized (processLock) {
+        zkProcess = processBuilder.start();
+        zkProcessCollector =
+            new StreamCollector(zkProcess.getInputStream());
+        zkProcessCollector.start();
+      }
+      Runnable runnable = new Runnable() {
+        public void run() {
+          LOG.info("run: Shutdown hook started.");
+          synchronized (processLock) {
+            if (zkProcess != null) {
+              LOG.warn("onlineZooKeeperServers: " +
+                  "Forced a shutdown hook kill of the " +
+                  "ZooKeeper process.");
+              zkProcess.destroy();
+              int exitCode = -1;
+              try {
+                exitCode = zkProcess.waitFor();
+              } catch (InterruptedException e) {
+                LOG.warn("run: Couldn't get exit code.");
+              }
+              LOG.info("onlineZooKeeperServers: ZooKeeper process exited " +
+                  "with " + exitCode + " (note that 143 " +
+                  "typically means killed).");
+            }
+          }
+        }
+      };
+      Runtime.getRuntime().addShutdownHook(new Thread(runnable));
+      LOG.info("onlineZooKeeperServers: Shutdown hook added.");
+    } catch (IOException e) {
+      LOG.error("onlineZooKeeperServers: Failed to start " +
+          "ZooKeeper process", e);
+      throw new RuntimeException(e);
+    }
+  }
+
+  @Override
+  public void stop() {
+    zkProcess.destroy();
+    int exitValue = -1;
+    try {
+      zkProcessCollector.join();
+      exitValue = zkProcess.waitFor();
+    } catch (InterruptedException e) {
+      LOG.warn("offlineZooKeeperServers: " +
+              "InterruptedException, but continuing ",
+          e);
+    }
+    if (LOG.isInfoEnabled()) {
+      LOG.info("offlineZooKeeperServers: waitFor returned " +
+          exitValue);
+    }
+  }
+
+  @Override
+  public void cleanup() {
+    logZooKeeperOutput(Level.WARN);
+  }
+
+
+  /**
+   * Collects the output of a stream and dumps it to the log.
+   */
+  private static class StreamCollector extends Thread {
+    /** Number of last lines to keep */
+    private static final int LAST_LINES_COUNT = 100;
+    /** Class logger */
+    private static final Logger LOG = Logger.getLogger(StreamCollector.class);
+    /** Buffered reader of input stream */
+    private final BufferedReader bufferedReader;
+    /** Last lines (help to debug failures) */
+    private final LinkedList<String> lastLines = Lists.newLinkedList();
+    /**
+     * Constructor.
+     *
+     * @param is InputStream to dump to LOG.info
+     */
+    public StreamCollector(final InputStream is) {
+      super(StreamCollector.class.getName());
+      setDaemon(true);
+      InputStreamReader streamReader = new InputStreamReader(is,
+          Charset.defaultCharset());
+      bufferedReader = new BufferedReader(streamReader);
+    }
+
+    @Override
+    public void run() {
+      readLines();
+    }
+
+    /**
+     * Read all the lines from the bufferedReader.
+     */
+    private synchronized void readLines() {
+      String line;
+      try {
+        while ((line = bufferedReader.readLine()) != null) {
+          if (lastLines.size() > LAST_LINES_COUNT) {
+            lastLines.removeFirst();
+          }
+          lastLines.add(line);
+
+          if (LOG.isDebugEnabled()) {
+            LOG.debug("readLines: " + line);
+          }
+        }
+      } catch (IOException e) {
+        LOG.error("readLines: Ignoring IOException", e);
+      }
+    }
+
+    /**
+     * Dump the last n lines of the collector.  Likely used in
+     * the case of failure.
+     *
+     * @param level Log level to dump with
+     */
+    public synchronized void dumpLastLines(Level level) {
+      // Get any remaining lines
+      readLines();
+      // Dump the lines to the screen
+      for (String line : lastLines) {
+        LOG.log(level, line);
+      }
+    }
+  }
+
+
+  /**
+   * Log the zookeeper output from the process (if it was started)
+   *
+   * @param level Log level to print at
+   */
+  public void logZooKeeperOutput(Level level) {
+    if (zkProcessCollector != null) {
+      LOG.log(level, "logZooKeeperOutput: Dumping up to last " +
+          StreamCollector.LAST_LINES_COUNT +
+          " lines of the ZooKeeper process STDOUT and STDERR.");
+      zkProcessCollector.dumpLastLines(level);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/dc4d9a2a/giraph-core/src/main/java/org/apache/giraph/zk/ZooKeeperManager.java
----------------------------------------------------------------------
diff --git 
a/giraph-core/src/main/java/org/apache/giraph/zk/ZooKeeperManager.java 
b/giraph-core/src/main/java/org/apache/giraph/zk/ZooKeeperManager.java
index b5816d7..82a408b 100644
--- a/giraph-core/src/main/java/org/apache/giraph/zk/ZooKeeperManager.java
+++ b/giraph-core/src/main/java/org/apache/giraph/zk/ZooKeeperManager.java
@@ -18,7 +18,6 @@
 
 package org.apache.giraph.zk;
 
-import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import com.google.common.io.Closeables;
 import com.google.common.util.concurrent.Uninterruptibles;
@@ -32,25 +31,18 @@ import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.mapreduce.Mapper;
-import org.apache.log4j.Level;
 import org.apache.log4j.Logger;
-import org.apache.zookeeper.server.quorum.QuorumPeerMain;
 
-import java.io.BufferedReader;
 import java.io.File;
 import java.io.FileWriter;
 import java.io.IOException;
-import java.io.InputStream;
-import java.io.InputStreamReader;
 import java.io.Writer;
 import java.net.ConnectException;
 import java.net.InetSocketAddress;
 import java.net.Socket;
 import java.net.SocketTimeoutException;
-import java.nio.charset.Charset;
 import java.util.ArrayList;
 import java.util.Arrays;
-import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.TimeUnit;
@@ -97,10 +89,8 @@ public class ZooKeeperManager {
   private final int serverCount;
   /** File system */
   private final FileSystem fs;
-  /** ZooKeeper process */
-  private Process zkProcess = null;
-  /** Thread that gets the zkProcess output */
-  private StreamCollector zkProcessCollector = null;
+  /** Zookeeper wrapper */
+  private ZooKeeperRunner zkRunner;
   /** ZooKeeper local file system directory */
   private final String zkDir;
   /** ZooKeeper config file path */
@@ -200,73 +190,6 @@ public class ZooKeeperManager {
   }
 
   /**
-   * Collects the output of a stream and dumps it to the log.
-   */
-  private static class StreamCollector extends Thread {
-    /** Number of last lines to keep */
-    private static final int LAST_LINES_COUNT = 100;
-    /** Class logger */
-    private static final Logger LOG = Logger.getLogger(StreamCollector.class);
-    /** Buffered reader of input stream */
-    private final BufferedReader bufferedReader;
-    /** Last lines (help to debug failures) */
-    private final LinkedList<String> lastLines = Lists.newLinkedList();
-    /**
-     * Constructor.
-     *
-     * @param is InputStream to dump to LOG.info
-     */
-    public StreamCollector(final InputStream is) {
-      super(StreamCollector.class.getName());
-      setDaemon(true);
-      InputStreamReader streamReader = new InputStreamReader(is,
-          Charset.defaultCharset());
-      bufferedReader = new BufferedReader(streamReader);
-    }
-
-    @Override
-    public void run() {
-      readLines();
-    }
-
-    /**
-     * Read all the lines from the bufferedReader.
-     */
-    private synchronized void readLines() {
-      String line;
-      try {
-        while ((line = bufferedReader.readLine()) != null) {
-          if (lastLines.size() > LAST_LINES_COUNT) {
-            lastLines.removeFirst();
-          }
-          lastLines.add(line);
-
-          if (LOG.isDebugEnabled()) {
-            LOG.debug("readLines: " + line);
-          }
-        }
-      } catch (IOException e) {
-        LOG.error("readLines: Ignoring IOException", e);
-      }
-    }
-
-    /**
-     * Dump the last n lines of the collector.  Likely used in
-     * the case of failure.
-     *
-     * @param level Log level to dump with
-     */
-    public synchronized void dumpLastLines(Level level) {
-      // Get any remaining lines
-      readLines();
-      // Dump the lines to the screen
-      for (String line : lastLines) {
-        LOG.log(level, line);
-      }
-    }
-  }
-
-  /**
    * Create the candidate stamps and decide on the servers to start if
    * you are partition 0.
    *
@@ -652,67 +575,9 @@ public class ZooKeeperManager {
             "directory " + this.zkDir, e);
       }
       generateZooKeeperConfigFile(new ArrayList<>(zkServerPortMap.keySet()));
-      ProcessBuilder processBuilder = new ProcessBuilder();
-      List<String> commandList = Lists.newArrayList();
-      String javaHome = System.getProperty("java.home");
-      if (javaHome == null) {
-        throw new IllegalArgumentException(
-            "onlineZooKeeperServers: java.home is not set!");
-      }
-      commandList.add(javaHome + "/bin/java");
-      commandList.add("-cp");
-      commandList.add(System.getProperty("java.class.path"));
-      String zkJavaOptsString = GiraphConstants.ZOOKEEPER_JAVA_OPTS.get(conf);
-      String[] zkJavaOptsArray = zkJavaOptsString.split(" ");
-      if (zkJavaOptsArray != null) {
-        commandList.addAll(Arrays.asList(zkJavaOptsArray));
-      }
-      commandList.add(QuorumPeerMain.class.getName());
-      commandList.add(configFilePath);
-      processBuilder.command(commandList);
-      File execDirectory = new File(zkDir);
-      processBuilder.directory(execDirectory);
-      processBuilder.redirectErrorStream(true);
-      if (LOG.isInfoEnabled()) {
-        LOG.info("onlineZooKeeperServers: Attempting to " +
-            "start ZooKeeper server with command " + commandList +
-            " in directory " + execDirectory.toString());
-      }
-      try {
-        synchronized (this) {
-          zkProcess = processBuilder.start();
-          zkProcessCollector =
-              new StreamCollector(zkProcess.getInputStream());
-          zkProcessCollector.start();
-        }
-        Runnable runnable = new Runnable() {
-          public void run() {
-            LOG.info("run: Shutdown hook started.");
-            synchronized (this) {
-              if (zkProcess != null) {
-                LOG.warn("onlineZooKeeperServers: " +
-                         "Forced a shutdown hook kill of the " +
-                         "ZooKeeper process.");
-                zkProcess.destroy();
-                int exitCode = -1;
-                try {
-                  exitCode = zkProcess.waitFor();
-                } catch (InterruptedException e) {
-                  LOG.warn("run: Couldn't get exit code.");
-                }
-                LOG.info("onlineZooKeeperServers: ZooKeeper process exited " +
-                    "with " + exitCode + " (note that 143 " +
-                    "typically means killed).");
-              }
-            }
-          }
-        };
-        Runtime.getRuntime().addShutdownHook(new Thread(runnable));
-        LOG.info("onlineZooKeeperServers: Shutdown hook added.");
-      } catch (IOException e) {
-        LOG.error("onlineZooKeeperServers: Failed to start " +
-            "ZooKeeper process", e);
-        throw new RuntimeException(e);
+      synchronized (this) {
+        zkRunner = createRunner();
+        zkRunner.start(zkDir, configFilePath);
       }
 
       // Once the server is up and running, notify that this server is up
@@ -907,7 +772,7 @@ public class ZooKeeperManager {
       createZooKeeperClosedStamp();
     }
     synchronized (this) {
-      if (zkProcess != null) {
+      if (zkRunner != null) {
         boolean isYarnJob = GiraphConstants.IS_PURE_YARN_JOB.get(conf);
         int totalWorkers = conf.getMapTasks();
         // A Yarn job always spawns MAX_WORKERS + 1 containers
@@ -917,33 +782,43 @@ public class ZooKeeperManager {
         LOG.info("offlineZooKeeperServers: Will wait for " +
             totalWorkers + " tasks");
         waitUntilAllTasksDone(totalWorkers);
-        zkProcess.destroy();
-        int exitValue = -1;
+        zkRunner.stop();
         File zkDirFile;
         try {
-          zkProcessCollector.join();
-          exitValue = zkProcess.waitFor();
           zkDirFile = new File(zkDir);
           FileUtils.deleteDirectory(zkDirFile);
-        } catch (InterruptedException e) {
-          LOG.warn("offlineZooKeeperServers: " +
-              "InterruptedException, but continuing ",
-              e);
         } catch (IOException e) {
           LOG.warn("offlineZooKeeperSevers: " +
-              "IOException, but continuing",
+                  "IOException, but continuing",
               e);
         }
         if (LOG.isInfoEnabled()) {
-          LOG.info("offlineZooKeeperServers: waitFor returned " +
-              exitValue + " and deleted directory " + zkDir);
+          LOG.info("offlineZooKeeperServers: deleted directory " + zkDir);
         }
-        zkProcess = null;
+        zkRunner = null;
       }
     }
   }
 
   /**
+   * Create appropriate zookeeper wrapper depending on configuration.
+   * Zookeeper can run in master process or outside as a separate
+   * java process.
+   *
+   * @return either in process or out of process wrapper.
+   */
+  private ZooKeeperRunner createRunner() {
+    ZooKeeperRunner runner;
+    if (GiraphConstants.ZOOKEEEPER_RUNS_IN_PROCESS.get(conf)) {
+      runner = new InProcessZooKeeperRunner();
+    } else {
+      runner = new OutOfProcessZooKeeperRunner();
+    }
+    runner.setConf(conf);
+    return runner;
+  }
+
+  /**
    *  Is this task running a ZooKeeper server?  Only could be true if called
    *  after onlineZooKeeperServers().
    *
@@ -951,21 +826,16 @@ public class ZooKeeperManager {
    */
   public boolean runsZooKeeper() {
     synchronized (this) {
-      return zkProcess != null;
+      return zkRunner != null;
     }
   }
 
   /**
-   * Log the zookeeper output from the process (if it was started)
-   *
-   * @param level Log level to print at
+   * Do necessary cleanup in zookeeper wrapper.
    */
-  public void logZooKeeperOutput(Level level) {
-    if (zkProcessCollector != null) {
-      LOG.log(level, "logZooKeeperOutput: Dumping up to last " +
-          StreamCollector.LAST_LINES_COUNT +
-          " lines of the ZooKeeper process STDOUT and STDERR.");
-      zkProcessCollector.dumpLastLines(level);
+  public void cleanup() {
+    if (zkRunner != null) {
+      zkRunner.cleanup();
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/giraph/blob/dc4d9a2a/giraph-core/src/main/java/org/apache/giraph/zk/ZooKeeperRunner.java
----------------------------------------------------------------------
diff --git 
a/giraph-core/src/main/java/org/apache/giraph/zk/ZooKeeperRunner.java 
b/giraph-core/src/main/java/org/apache/giraph/zk/ZooKeeperRunner.java
new file mode 100644
index 0000000..4c13a25
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/zk/ZooKeeperRunner.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.zk;
+
+import org.apache.giraph.conf.ImmutableClassesGiraphConfigurable;
+
+/**
+ * ZooKeeper wrapper interface.
+ * Implementation should provide a way to start, stop and cleanup
+ * zookeeper.
+ */
+public interface ZooKeeperRunner extends ImmutableClassesGiraphConfigurable {
+
+  /**
+   * Starts zookeeper service in specified working directory with
+   * specified config file.
+   * @param zkDir working directory
+   * @param configFilePath path to the config file
+   */
+  void start(String zkDir, String configFilePath);
+
+  /**
+   * Stops zookeeper.
+   */
+  void stop();
+
+  /**
+   * Does necessary cleanup after zookeeper job is complete.
+   */
+  void cleanup();
+}

Reply via email to