Updated Branches: refs/heads/trunk 2b95451e1 -> e1a7f2905
GIRAPH-477: Fetching locality info in InputSplitPathOrganizer causes jobs to hang (apresta via ereisman) Project: http://git-wip-us.apache.org/repos/asf/giraph/repo Commit: http://git-wip-us.apache.org/repos/asf/giraph/commit/e1a7f290 Tree: http://git-wip-us.apache.org/repos/asf/giraph/tree/e1a7f290 Diff: http://git-wip-us.apache.org/repos/asf/giraph/diff/e1a7f290 Branch: refs/heads/trunk Commit: e1a7f2905b7afc0d621f129a092b5c7fc14e97ab Parents: 2b95451 Author: Eli Reisman <[email protected]> Authored: Thu Jan 10 21:17:53 2013 -0800 Committer: Eli Reisman <[email protected]> Committed: Thu Jan 10 21:17:53 2013 -0800 ---------------------------------------------------------------------- CHANGELOG | 2 + .../org/apache/giraph/conf/GiraphConstants.java | 14 +++++ .../org/apache/giraph/master/BspServiceMaster.java | 40 ++++++++++----- .../giraph/worker/InputSplitPathOrganizer.java | 41 +++++++++------ .../apache/giraph/worker/InputSplitsCallable.java | 14 ++++- .../test/java/org/apache/giraph/TestBspBasic.java | 9 ++-- 6 files changed, 83 insertions(+), 37 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/giraph/blob/e1a7f290/CHANGELOG ---------------------------------------------------------------------- diff --git a/CHANGELOG b/CHANGELOG index 78f01db..6c2c5e6 100644 --- a/CHANGELOG +++ b/CHANGELOG @@ -1,6 +1,8 @@ Giraph Change Log Release 0.2.0 - unreleased + GIRAPH-Fetching locality info in InputSplitPathOrganizer causes jobs to hang (apresta via ereisman) + GIRAPH-459: Group Vertex Mutations by Partition ID (claudio) GIRAPH-473: InputSplitPathOrganizer should be aware of multiple threads (apresta via ereisman) http://git-wip-us.apache.org/repos/asf/giraph/blob/e1a7f290/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java index 9acc50a..8e75e5b 100644 --- a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java +++ b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java @@ -419,6 +419,20 @@ public interface GiraphConstants { */ long INPUT_SPLIT_MAX_EDGES_DEFAULT = -1; + /** + * To minimize network usage when reading input splits, + * each worker can prioritize splits that reside on its host. + * This, however, comes at the cost of increased load on ZooKeeper. + * Hence, users with a lot of splits and input threads (or with + * configurations that can't exploit locality) may want to disable it. + */ + String USE_INPUT_SPLIT_LOCALITY = "giraph.useInputSplitLocality"; + + /** + * Default is to prioritize local input splits. + */ + boolean USE_INPUT_SPLIT_LOCALITY_DEFAULT = true; + /** Java opts passed to ZooKeeper startup */ String ZOOKEEPER_JAVA_OPTS = "giraph.zkJavaOpts"; /** Default java opts passed to ZooKeeper startup */ http://git-wip-us.apache.org/repos/asf/giraph/blob/e1a7f290/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java b/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java index 4483385..33f9f4a 100644 --- a/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java +++ b/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java @@ -602,9 +602,13 @@ public class BspServiceMaster<I extends WritableComparable, } ExecutorService taskExecutor = Executors.newFixedThreadPool(inputSplitThreadCount); + boolean writeLocations = getConfiguration().getBoolean( + GiraphConstants.USE_INPUT_SPLIT_LOCALITY, + GiraphConstants.USE_INPUT_SPLIT_LOCALITY_DEFAULT); for (int i = 0; i < splitList.size(); ++i) { InputSplit inputSplit = splitList.get(i); - taskExecutor.submit(new WriteInputSplit(inputSplit, inputSplitsPath, i)); + taskExecutor.submit(new WriteInputSplit(inputSplit, inputSplitsPath, i, + writeLocations)); } taskExecutor.shutdown(); ProgressableUtils.awaitExecutorTermination(taskExecutor, getContext()); @@ -1821,6 +1825,8 @@ public class BspServiceMaster<I extends WritableComparable, private final String inputSplitsPath; /** Index of the input split */ private final int index; + /** Whether to write locality information */ + private final boolean writeLocations; /** * Constructor @@ -1828,13 +1834,18 @@ public class BspServiceMaster<I extends WritableComparable, * @param inputSplit Input split which we are going to write * @param inputSplitsPath Input splits path * @param index Index of the input split + * @param writeLocations whether to write the input split's locations (to + * be used by workers for prioritizing local splits + * when reading) */ public WriteInputSplit(InputSplit inputSplit, String inputSplitsPath, - int index) { + int index, + boolean writeLocations) { this.inputSplit = inputSplit; this.inputSplitsPath = inputSplitsPath; this.index = index; + this.writeLocations = writeLocations; } @Override @@ -1846,19 +1857,22 @@ public class BspServiceMaster<I extends WritableComparable, DataOutput outputStream = new DataOutputStream(byteArrayOutputStream); - String[] splitLocations = inputSplit.getLocations(); - StringBuilder locations = null; - if (splitLocations != null) { - int splitListLength = - Math.min(splitLocations.length, localityLimit); - locations = new StringBuilder(); - for (String location : splitLocations) { - locations.append(location) - .append(--splitListLength > 0 ? "\t" : ""); + if (writeLocations) { + String[] splitLocations = inputSplit.getLocations(); + StringBuilder locations = null; + if (splitLocations != null) { + int splitListLength = + Math.min(splitLocations.length, localityLimit); + locations = new StringBuilder(); + for (String location : splitLocations) { + locations.append(location) + .append(--splitListLength > 0 ? "\t" : ""); + } } + Text.writeString(outputStream, + locations == null ? "" : locations.toString()); } - Text.writeString(outputStream, - locations == null ? "" : locations.toString()); + Text.writeString(outputStream, inputSplit.getClass().getName()); ((Writable) inputSplit).write(outputStream); http://git-wip-us.apache.org/repos/asf/giraph/blob/e1a7f290/giraph-core/src/main/java/org/apache/giraph/worker/InputSplitPathOrganizer.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/worker/InputSplitPathOrganizer.java b/giraph-core/src/main/java/org/apache/giraph/worker/InputSplitPathOrganizer.java index 21e59bd..f5b054d 100644 --- a/giraph-core/src/main/java/org/apache/giraph/worker/InputSplitPathOrganizer.java +++ b/giraph-core/src/main/java/org/apache/giraph/worker/InputSplitPathOrganizer.java @@ -60,13 +60,14 @@ public class InputSplitPathOrganizer implements Iterable<String> { * @param hostName the worker's host name (for matching) * @param port the port number for this worker * @param threadId id of the input split thread + * @param useLocality whether to prioritize local input splits */ public InputSplitPathOrganizer(final ZooKeeperExt zooKeeper, final String zkPathList, final String hostName, final int port, - final int threadId) + final int threadId, final boolean useLocality) throws KeeperException, InterruptedException { this(zooKeeper, zooKeeper.getChildrenExt(zkPathList, false, false, true), - hostName, port, threadId); + hostName, port, threadId, useLocality); } /** @@ -77,30 +78,44 @@ public class InputSplitPathOrganizer implements Iterable<String> { * @param hostName the worker's host name (for matching) * @param port the port number for this worker * @param threadId id of the input split thread + * @param useLocality whether to prioritize local input splits */ public InputSplitPathOrganizer( final ZooKeeperExt zooKeeper, final List<String> inputSplitPathList, - final String hostName, final int port, final int threadId) + final String hostName, final int port, final int threadId, + final boolean useLocality) throws KeeperException, InterruptedException { this.zooKeeper = zooKeeper; this.pathList = Lists.newArrayList(inputSplitPathList); this.hostName = hostName; - this.baseOffset = 0; // set later after switching out local paths - prioritizeLocalInputSplits(port, threadId); + this.baseOffset = computeBaseOffset(port, threadId); + if (useLocality) { + prioritizeLocalInputSplits(); + } + } + + /** + * Compute base offset to start iterating from, + * in order to avoid collisions with other workers/threads. + * + * @param port the port number for this worker + * @param threadId id of the input split thread + * @return the offset to start iterating from + */ + private int computeBaseOffset(int port, int threadId) { + return pathList.isEmpty() ? 0 : + Math.abs(Objects.hashCode(hostName, port, threadId) % pathList.size()); } - /** + /** * Re-order list of InputSplits so files local to this worker node's * disk are the first it will iterate over when attempting to claim * a split to read. This will increase locality of data reads with greater * probability as the % of total nodes in the cluster hosting data and workers * BOTH increase towards 100%. Replication increases our chances of a "hit." * - * @param port the port number for hashing unique iteration indexes for all - * workers, even those sharing the same host node. - * @param threadId id of the input split thread */ - private void prioritizeLocalInputSplits(final int port, final int threadId) { + private void prioritizeLocalInputSplits() { List<String> sortedList = new ArrayList<String>(); String hosts; for (Iterator<String> iterator = pathList.iterator(); iterator.hasNext();) { @@ -121,12 +136,6 @@ public class InputSplitPathOrganizer implements Iterable<String> { } // shuffle the local blocks in case several workers exist on this host Collections.shuffle(sortedList); - // determine the hash-based offset for this worker to iterate from - // and place the local blocks into the list at that index, if any - final int hashOffset = Objects.hashCode(hostName, port, threadId); - if (pathList.size() != 0) { - baseOffset = Math.abs(hashOffset % pathList.size()); - } // re-insert local paths at "adjusted index zero" for caller to iterate on pathList.addAll(baseOffset, sortedList); } http://git-wip-us.apache.org/repos/asf/giraph/blob/e1a7f290/giraph-core/src/main/java/org/apache/giraph/worker/InputSplitsCallable.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/worker/InputSplitsCallable.java b/giraph-core/src/main/java/org/apache/giraph/worker/InputSplitsCallable.java index 0f5cdd4..acd4e2d 100644 --- a/giraph-core/src/main/java/org/apache/giraph/worker/InputSplitsCallable.java +++ b/giraph-core/src/main/java/org/apache/giraph/worker/InputSplitsCallable.java @@ -18,9 +18,10 @@ package org.apache.giraph.worker; -import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration; import org.apache.giraph.comm.WorkerClientRequestProcessor; import org.apache.giraph.comm.netty.NettyWorkerClientRequestProcessor; +import org.apache.giraph.conf.GiraphConstants; +import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration; import org.apache.giraph.graph.GraphState; import org.apache.giraph.graph.InputSplitEvents; import org.apache.giraph.graph.VertexEdgeCount; @@ -92,6 +93,8 @@ public abstract class InputSplitsCallable<I extends WritableComparable, private final String inputSplitFinishedNode; /** Input split events. */ private final InputSplitEvents inputSplitEvents; + /** Whether to prioritize local input splits. */ + private final boolean useLocality; // CHECKSTYLE: stop ParameterNumberCheck /** @@ -130,10 +133,13 @@ public abstract class InputSplitsCallable<I extends WritableComparable, graphState.getTotalNumVertices(), graphState.getTotalNumEdges(), context, graphState.getGraphMapper(), workerClientRequestProcessor, null); + this.useLocality = configuration.getBoolean( + GiraphConstants.USE_INPUT_SPLIT_LOCALITY, + GiraphConstants.USE_INPUT_SPLIT_LOCALITY_DEFAULT); try { splitOrganizer = new InputSplitPathOrganizer(zooKeeperExt, inputSplitPathList, workerInfo.getHostname(), workerInfo.getPort(), - threadId); + threadId, this.useLocality); } catch (KeeperException e) { throw new IllegalStateException( "InputSplitsCallable: KeeperException", e); @@ -377,7 +383,9 @@ public abstract class InputSplitsCallable<I extends WritableComparable, DataInputStream inputStream = new DataInputStream(new ByteArrayInputStream(splitList)); - Text.readString(inputStream); // location data unused here, skip + if (useLocality) { + Text.readString(inputStream); // location data unused here, skip + } String inputSplitClass = Text.readString(inputStream); InputSplit inputSplit = (InputSplit) ReflectionUtils.newInstance( http://git-wip-us.apache.org/repos/asf/giraph/blob/e1a7f290/giraph-core/src/test/java/org/apache/giraph/TestBspBasic.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/test/java/org/apache/giraph/TestBspBasic.java b/giraph-core/src/test/java/org/apache/giraph/TestBspBasic.java index 56ee5a9..987f51c 100644 --- a/giraph-core/src/test/java/org/apache/giraph/TestBspBasic.java +++ b/giraph-core/src/test/java/org/apache/giraph/TestBspBasic.java @@ -341,8 +341,6 @@ else[HADOOP_NON_JOBCONTEXT_IS_INTERFACE]*/ @Test public void testInputSplitPathOrganizer() throws IOException, KeeperException, InterruptedException { - final List<String> goodList = new ArrayList<String>(); - Collections.addAll(goodList, "local", "remote1", "remote2"); final List<String> testList = new ArrayList<String>(); Collections.addAll(testList, "remote2", "local", "remote1"); final String localHost = "node.LOCAL.com"; @@ -364,17 +362,18 @@ else[HADOOP_NON_JOBCONTEXT_IS_INTERFACE]*/ Text.writeString(dos, first); byte[] local = baos.toByteArray(); ZooKeeperExt zk = mock(ZooKeeperExt.class); - when(zk.getChildrenExt(testListName, false, false, true)).thenReturn(testList); + when(zk.getChildrenExt(testListName, false, false, true)). + thenReturn(testList); when(zk.getData("remote1", false, null)).thenReturn(remote1); when(zk.getData("remote2", false, null)).thenReturn(remote2); when(zk.getData("local", false, null)).thenReturn(local); InputSplitPathOrganizer lis = - new InputSplitPathOrganizer(zk, testListName, localHost, 0, 0); + new InputSplitPathOrganizer(zk, testListName, localHost, 0, 0, true); final List<String> resultList = new ArrayList<String>(); for (String next : lis) { resultList.add(next); } - assertEquals(goodList, resultList); + assertEquals("local", resultList.get(0)); } /**
