Updated Branches: refs/heads/trunk 3a20c5597 -> e5a21c4bb
GIRAPH-755: Make ZooKeeper port list available to input/output format (armax00 via claudio) Project: http://git-wip-us.apache.org/repos/asf/giraph/repo Commit: http://git-wip-us.apache.org/repos/asf/giraph/commit/e5a21c4b Tree: http://git-wip-us.apache.org/repos/asf/giraph/tree/e5a21c4b Diff: http://git-wip-us.apache.org/repos/asf/giraph/diff/e5a21c4b Branch: refs/heads/trunk Commit: e5a21c4bb27055cad85f1d9dd494867c0fae8c22 Parents: 3a20c55 Author: Claudio Martella <[email protected]> Authored: Wed Nov 6 14:52:45 2013 +0100 Committer: Claudio Martella <[email protected]> Committed: Wed Nov 6 14:52:45 2013 +0100 ---------------------------------------------------------------------- CHANGELOG | 2 ++ .../java/org/apache/giraph/bsp/BspService.java | 5 ++-- .../apache/giraph/conf/GiraphConfiguration.java | 21 ++++++++++++++ .../org/apache/giraph/conf/GiraphConstants.java | 16 ++++++++++- .../apache/giraph/graph/GraphTaskManager.java | 29 ++++++++------------ .../apache/giraph/master/BspServiceMaster.java | 6 ++-- .../apache/giraph/worker/BspServiceWorker.java | 4 +-- .../apache/giraph/yarn/GiraphYarnClient.java | 6 ++-- 8 files changed, 58 insertions(+), 31 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/giraph/blob/e5a21c4b/CHANGELOG ---------------------------------------------------------------------- diff --git a/CHANGELOG b/CHANGELOG index 04656f0..cd04a2b 100644 --- a/CHANGELOG +++ b/CHANGELOG @@ -1,6 +1,8 @@ Giraph Change Log Release 1.1.0 - unreleased + GIRAPH-755: Make ZooKeeper port list available to input/output format (armax00 via claudio) + GIRAPH-737: Giraph Application Master: Move to new and stable YARN API (mislam via ereisman) GIRAPH-791: HiveGiraphRunner picks -D options too late (majakabiljo) http://git-wip-us.apache.org/repos/asf/giraph/blob/e5a21c4b/giraph-core/src/main/java/org/apache/giraph/bsp/BspService.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/bsp/BspService.java b/giraph-core/src/main/java/org/apache/giraph/bsp/BspService.java index 34f4b51..86823ed 100644 --- a/giraph-core/src/main/java/org/apache/giraph/bsp/BspService.java +++ b/giraph-core/src/main/java/org/apache/giraph/bsp/BspService.java @@ -253,13 +253,11 @@ public abstract class BspService<I extends WritableComparable, /** * Constructor. * - * @param serverPortList ZooKeeper server port list * @param sessionMsecTimeout ZooKeeper session timeount in milliseconds * @param context Mapper context * @param graphTaskManager GraphTaskManager for this compute node */ - public BspService(String serverPortList, - int sessionMsecTimeout, + public BspService(int sessionMsecTimeout, Mapper<?, ?, ?, ?>.Context context, GraphTaskManager<I, V, E> graphTaskManager) { this.vertexInputSplitsEvents = new InputSplitEvents(context); @@ -322,6 +320,7 @@ public abstract class BspService<I extends WritableComparable, CHECKPOINT_DIRECTORY.getWithDefault(getConfiguration(), CHECKPOINT_DIRECTORY.getDefaultValue() + "/" + getJobId()); masterElectionPath = basePath + MASTER_ELECTION_DIR; + String serverPortList = conf.getZookeeperList(); haltComputationPath = basePath + HALT_COMPUTATION_NODE; getContext().getCounter(GiraphConstants.ZOOKEEPER_HALT_NODE_COUNTER_GROUP, haltComputationPath); http://git-wip-us.apache.org/repos/asf/giraph/blob/e5a21c4b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConfiguration.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConfiguration.java b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConfiguration.java index f176bfe..d066513 100644 --- a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConfiguration.java +++ b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConfiguration.java @@ -731,6 +731,27 @@ public class GiraphConfiguration extends Configuration return get(ZOOKEEPER_LIST); } + /** + * Set the ZooKeeper list to the provided list. This method is used when the + * ZooKeeper is started internally and will set the zkIsExternal option to + * false as well. + * + * @param zkList list of strings, comma separated of zookeeper servers + */ + public void setZookeeperList(String zkList) { + set(ZOOKEEPER_LIST, zkList); + ZOOKEEPER_IS_EXTERNAL.set(this, false); + } + + /** + * Was ZooKeeper provided externally? + * + * @return true iff was zookeeper is external + */ + public boolean isZookeeperExternal() { + return ZOOKEEPER_IS_EXTERNAL.get(this); + } + public String getLocalLevel() { return LOG_LEVEL.get(this); } http://git-wip-us.apache.org/repos/asf/giraph/blob/e5a21c4b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java index 6f32e46..3f379f1 100644 --- a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java +++ b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java @@ -404,10 +404,24 @@ public interface GiraphConstants { /** * ZooKeeper comma-separated list (if not set, - * will start up ZooKeeper locally) + * will start up ZooKeeper locally). Consider that after locally-starting + * zookeeper, this parameter will updated the configuration with the corrent + * configuration value. */ String ZOOKEEPER_LIST = "giraph.zkList"; + /** + * Zookeeper List will always hold a value during the computation while + * this option provides information regarding whether the zookeeper was + * internally started or externally provided. + */ + BooleanConfOption ZOOKEEPER_IS_EXTERNAL = + new BooleanConfOption("giraph.zkIsExternal", true, + "Zookeeper List will always hold a value during " + + "the computation while this option provides " + + "information regarding whether the zookeeper was " + + "internally started or externally provided."); + /** ZooKeeper session millisecond timeout */ IntConfOption ZOOKEEPER_SESSION_TIMEOUT = new IntConfOption("giraph.zkSessionMsecTimeout", MINUTES.toMillis(1), http://git-wip-us.apache.org/repos/asf/giraph/blob/e5a21c4b/giraph-core/src/main/java/org/apache/giraph/graph/GraphTaskManager.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/GraphTaskManager.java b/giraph-core/src/main/java/org/apache/giraph/graph/GraphTaskManager.java index 3939d49..f31d99e 100644 --- a/giraph-core/src/main/java/org/apache/giraph/graph/GraphTaskManager.java +++ b/giraph-core/src/main/java/org/apache/giraph/graph/GraphTaskManager.java @@ -144,8 +144,6 @@ public class GraphTaskManager<I extends WritableComparable, V extends Writable, private GiraphTimerContext communicationTimerContext; /** Timer for WorkerContext#preSuperstep() */ private GiraphTimer wcPreSuperstepTimer; - /** Zookeeper host:port list */ - private String serverPortList; /** The Hadoop Mapper#Context for this job */ private Mapper<?, ?, ?, ?>.Context context; /** is this GraphTaskManager the master? */ @@ -200,7 +198,7 @@ public class GraphTaskManager<I extends WritableComparable, V extends Writable, // Do some task setup (possibly starting up a Zookeeper service) context.setStatus("setup: Initializing Zookeeper services."); locateZookeeperClasspath(zkPathList); - serverPortList = conf.getZookeeperList(); + String serverPortList = conf.getZookeeperList(); if (serverPortList == null && startZooKeeperManager()) { return; // ZK connect/startup failed } @@ -219,7 +217,7 @@ public class GraphTaskManager<I extends WritableComparable, V extends Writable, } int sessionMsecTimeout = conf.getZooKeeperSessionTimeout(); try { - instantiateBspService(serverPortList, sessionMsecTimeout); + instantiateBspService(sessionMsecTimeout); } catch (IOException e) { LOG.error("setup: Caught exception just before end of setup", e); if (zkManager != null) { @@ -369,7 +367,8 @@ public class GraphTaskManager<I extends WritableComparable, V extends Writable, return true; } zkManager.onlineZooKeeperServers(); - serverPortList = zkManager.getZooKeeperServerPortString(); + String serverPortList = zkManager.getZooKeeperServerPortString(); + conf.setZookeeperList(serverPortList); context.getCounter(GiraphConstants.ZOOKEEPER_SERVER_PORT_COUNTER_GROUP, serverPortList); return false; @@ -493,9 +492,9 @@ public class GraphTaskManager<I extends WritableComparable, V extends Writable, * ZooKeeper. * 2) If split master/worker, masters also run ZooKeeper * - * 3) If split master/worker == true and <code>giraph.zkList</code> is set, - * the master will not instantiate a ZK instance, but will assume - * a quorum is already active on the cluster for Giraph to use. + * 3) If split master/worker == true and <code>giraph.zkList</code> is + * externally provided, the master will not instantiate a ZK instance, but + * will assume a quorum is already active on the cluster for Giraph to use. * * @param conf Configuration to use * @param zkManager ZooKeeper manager to help determine whether to run @@ -507,7 +506,7 @@ public class GraphTaskManager<I extends WritableComparable, V extends Writable, ZooKeeperManager zkManager) { boolean splitMasterWorker = conf.getSplitMasterWorker(); int taskPartition = conf.getTaskPartition(); - boolean zkAlreadyProvided = conf.getZookeeperList() != null; + boolean zkAlreadyProvided = conf.isZookeeperExternal(); GraphFunctions functions = GraphFunctions.UNKNOWN; // What functions should this mapper do? if (!splitMasterWorker) { @@ -538,18 +537,17 @@ public class GraphTaskManager<I extends WritableComparable, V extends Writable, /** * Instantiate the appropriate BspService object (Master or Worker) * for this compute node. - * @param serverPortList host:port list for connecting to ZK quorum * @param sessionMsecTimeout configurable session timeout */ - private void instantiateBspService(String serverPortList, - int sessionMsecTimeout) throws IOException, InterruptedException { + private void instantiateBspService(int sessionMsecTimeout) + throws IOException, InterruptedException { if (graphFunctions.isMaster()) { if (LOG.isInfoEnabled()) { LOG.info("setup: Starting up BspServiceMaster " + "(master thread)..."); } serviceMaster = new BspServiceMaster<I, V, E>( - serverPortList, sessionMsecTimeout, context, this); + sessionMsecTimeout, context, this); masterThread = new MasterThread<I, V, E>(serviceMaster, context); masterThread.start(); } @@ -558,10 +556,7 @@ public class GraphTaskManager<I extends WritableComparable, V extends Writable, LOG.info("setup: Starting up BspServiceWorker..."); } serviceWorker = new BspServiceWorker<I, V, E>( - serverPortList, - sessionMsecTimeout, - context, - this); + sessionMsecTimeout, context, this); if (LOG.isInfoEnabled()) { LOG.info("setup: Registering health of this worker..."); } http://git-wip-us.apache.org/repos/asf/giraph/blob/e5a21c4b/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java b/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java index f043c61..baa8434 100644 --- a/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java +++ b/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java @@ -187,17 +187,15 @@ public class BspServiceMaster<I extends WritableComparable, /** * Constructor for setting up the master. * - * @param serverPortList ZooKeeper server port list * @param sessionMsecTimeout Msecs to timeout connecting to ZooKeeper * @param context Mapper context * @param graphTaskManager GraphTaskManager for this compute node */ public BspServiceMaster( - String serverPortList, int sessionMsecTimeout, Mapper<?, ?, ?, ?>.Context context, GraphTaskManager<I, V, E> graphTaskManager) { - super(serverPortList, sessionMsecTimeout, context, graphTaskManager); + super(sessionMsecTimeout, context, graphTaskManager); workerWroteCheckpoint = new PredicateLock(context); registerBspEvent(workerWroteCheckpoint); superstepStateChanged = new PredicateLock(context); @@ -1768,7 +1766,7 @@ public class BspServiceMaster<I extends WritableComparable, // and the master can do any final cleanup if the ZooKeeper service was // provided (not dynamically started) and we don't want to keep the data try { - if (getConfiguration().getZookeeperList() != null && + if (getConfiguration().isZookeeperExternal() && KEEP_ZOOKEEPER_DATA.isFalse(getConfiguration())) { if (LOG.isInfoEnabled()) { LOG.info("cleanupZooKeeper: Removing the following path " + http://git-wip-us.apache.org/repos/asf/giraph/blob/e5a21c4b/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java b/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java index a92ddf8..f6da680 100644 --- a/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java +++ b/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java @@ -169,7 +169,6 @@ public class BspServiceWorker<I extends WritableComparable, /** * Constructor for setting up the worker. * - * @param serverPortList ZooKeeper server port list * @param sessionMsecTimeout Msecs to timeout connecting to ZooKeeper * @param context Mapper context * @param graphTaskManager GraphTaskManager for this compute node @@ -177,12 +176,11 @@ public class BspServiceWorker<I extends WritableComparable, * @throws InterruptedException */ public BspServiceWorker( - String serverPortList, int sessionMsecTimeout, Mapper<?, ?, ?, ?>.Context context, GraphTaskManager<I, V, E> graphTaskManager) throws IOException, InterruptedException { - super(serverPortList, sessionMsecTimeout, context, graphTaskManager); + super(sessionMsecTimeout, context, graphTaskManager); ImmutableClassesGiraphConfiguration<I, V, E> conf = getConfiguration(); partitionExchangeChildrenChanged = new PredicateLock(context); registerBspEvent(partitionExchangeChildrenChanged); http://git-wip-us.apache.org/repos/asf/giraph/blob/e5a21c4b/giraph-core/src/main/java/org/apache/giraph/yarn/GiraphYarnClient.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/yarn/GiraphYarnClient.java b/giraph-core/src/main/java/org/apache/giraph/yarn/GiraphYarnClient.java index ab6564e..e8926eb 100644 --- a/giraph-core/src/main/java/org/apache/giraph/yarn/GiraphYarnClient.java +++ b/giraph-core/src/main/java/org/apache/giraph/yarn/GiraphYarnClient.java @@ -455,11 +455,11 @@ public class GiraphYarnClient { * removed as we expand the functionality of the "pure YARN" Giraph profile. */ private void checkJobLocalZooKeeperSupported() { + final boolean isZkExternal = giraphConf.isZookeeperExternal(); final String checkZkList = giraphConf.getZookeeperList(); - if (checkZkList == null || checkZkList.isEmpty()) { + if (!isZkExternal || checkZkList.isEmpty()) { throw new IllegalArgumentException("Giraph on YARN does not currently" + - "support Giraph-managed ZK instances: use a standalone ZooKeeper: '" + - checkZkList + "'"); + "support Giraph-managed ZK instances: use a standalone ZooKeeper."); } }
