Updated Branches: refs/heads/trunk 9805d231a -> a5bc5bb35
GIRAPH-473: InputSplitPathOrganizer should be aware of multiple threads (apresta via ereisman) Project: http://git-wip-us.apache.org/repos/asf/giraph/repo Commit: http://git-wip-us.apache.org/repos/asf/giraph/commit/a5bc5bb3 Tree: http://git-wip-us.apache.org/repos/asf/giraph/tree/a5bc5bb3 Diff: http://git-wip-us.apache.org/repos/asf/giraph/diff/a5bc5bb3 Branch: refs/heads/trunk Commit: a5bc5bb35a84a2c61623da9269842985e7cc9da5 Parents: 9805d23 Author: Eli Reisman <[email protected]> Authored: Thu Jan 10 16:16:58 2013 -0800 Committer: Eli Reisman <[email protected]> Committed: Thu Jan 10 16:16:58 2013 -0800 ---------------------------------------------------------------------- CHANGELOG | 2 + .../org/apache/giraph/worker/BspServiceWorker.java | 2 +- .../giraph/worker/EdgeInputSplitsCallable.java | 4 +- .../worker/EdgeInputSplitsCallableFactory.java | 3 +- .../giraph/worker/InputSplitPathOrganizer.java | 30 +++++++++------ .../apache/giraph/worker/InputSplitsCallable.java | 5 ++- .../giraph/worker/InputSplitsCallableFactory.java | 3 +- .../giraph/worker/VertexInputSplitsCallable.java | 4 +- .../worker/VertexInputSplitsCallableFactory.java | 3 +- .../test/java/org/apache/giraph/TestBspBasic.java | 18 ++++---- 10 files changed, 46 insertions(+), 28 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/giraph/blob/a5bc5bb3/CHANGELOG ---------------------------------------------------------------------- diff --git a/CHANGELOG b/CHANGELOG index 6ec0579..d365bfa 100644 --- a/CHANGELOG +++ b/CHANGELOG @@ -1,6 +1,8 @@ Giraph Change Log Release 0.2.0 - unreleased + GIRAPH-473: InputSplitPathOrganizer should be aware of multiple threads (apresta via ereisman) + GIRAPH-478: Bring back jar-with-deps for giraph-hcatalog (nitay) GIRAPH-474: Add an oprtion not to use direct byte buffers. (majakabiljo via ereisman) http://git-wip-us.apache.org/repos/asf/giraph/blob/a5bc5bb3/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java b/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java index f33fe58..31a4dc6 100644 --- a/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java +++ b/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java @@ -264,7 +264,7 @@ public class BspServiceWorker<I extends WritableComparable, } for (int i = 0; i < numThreads; ++i) { Callable<VertexEdgeCount> inputSplitsCallable = - inputSplitsCallableFactory.newCallable(); + inputSplitsCallableFactory.newCallable(i); threadsFutures.add(inputSplitsExecutor.submit(inputSplitsCallable)); } http://git-wip-us.apache.org/repos/asf/giraph/blob/a5bc5bb3/giraph-core/src/main/java/org/apache/giraph/worker/EdgeInputSplitsCallable.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/worker/EdgeInputSplitsCallable.java b/giraph-core/src/main/java/org/apache/giraph/worker/EdgeInputSplitsCallable.java index 23e2ff7..7d40dfb 100644 --- a/giraph-core/src/main/java/org/apache/giraph/worker/EdgeInputSplitsCallable.java +++ b/giraph-core/src/main/java/org/apache/giraph/worker/EdgeInputSplitsCallable.java @@ -75,6 +75,7 @@ public class EdgeInputSplitsCallable<I extends WritableComparable, * @param bspServiceWorker service worker * @param inputSplitPathList List of the paths of the input splits * @param workerInfo This worker's info + * @param threadId Id of input split thread * @param zooKeeperExt Handle to ZooKeeperExt */ public EdgeInputSplitsCallable( @@ -84,9 +85,10 @@ public class EdgeInputSplitsCallable<I extends WritableComparable, BspServiceWorker<I, V, E, M> bspServiceWorker, List<String> inputSplitPathList, WorkerInfo workerInfo, + int threadId, ZooKeeperExt zooKeeperExt) { super(context, graphState, configuration, bspServiceWorker, - inputSplitPathList, workerInfo, zooKeeperExt, + inputSplitPathList, workerInfo, threadId, zooKeeperExt, BspServiceWorker.EDGE_INPUT_SPLIT_RESERVED_NODE, BspServiceWorker.EDGE_INPUT_SPLIT_FINISHED_NODE, bspServiceWorker.getEdgeInputSplitsEvents()); http://git-wip-us.apache.org/repos/asf/giraph/blob/a5bc5bb3/giraph-core/src/main/java/org/apache/giraph/worker/EdgeInputSplitsCallableFactory.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/worker/EdgeInputSplitsCallableFactory.java b/giraph-core/src/main/java/org/apache/giraph/worker/EdgeInputSplitsCallableFactory.java index 1a9a744..1adcd73 100644 --- a/giraph-core/src/main/java/org/apache/giraph/worker/EdgeInputSplitsCallableFactory.java +++ b/giraph-core/src/main/java/org/apache/giraph/worker/EdgeInputSplitsCallableFactory.java @@ -82,7 +82,7 @@ public class EdgeInputSplitsCallableFactory<I extends WritableComparable, } @Override - public InputSplitsCallable<I, V, E, M> newCallable() { + public InputSplitsCallable<I, V, E, M> newCallable(int threadId) { return new EdgeInputSplitsCallable<I, V, E, M>( context, graphState, @@ -90,6 +90,7 @@ public class EdgeInputSplitsCallableFactory<I extends WritableComparable, bspServiceWorker, inputSplitPathList, workerInfo, + threadId, zooKeeperExt); } } http://git-wip-us.apache.org/repos/asf/giraph/blob/a5bc5bb3/giraph-core/src/main/java/org/apache/giraph/worker/InputSplitPathOrganizer.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/worker/InputSplitPathOrganizer.java b/giraph-core/src/main/java/org/apache/giraph/worker/InputSplitPathOrganizer.java index b82da7d..21e59bd 100644 --- a/giraph-core/src/main/java/org/apache/giraph/worker/InputSplitPathOrganizer.java +++ b/giraph-core/src/main/java/org/apache/giraph/worker/InputSplitPathOrganizer.java @@ -17,17 +17,19 @@ */ package org.apache.giraph.worker; +import com.google.common.base.Objects; import com.google.common.collect.Lists; +import org.apache.giraph.zk.ZooKeeperExt; +import org.apache.hadoop.io.Text; +import org.apache.zookeeper.KeeperException; + import java.io.ByteArrayInputStream; import java.io.DataInputStream; import java.io.IOException; import java.util.ArrayList; import java.util.Collections; -import java.util.List; import java.util.Iterator; -import org.apache.giraph.zk.ZooKeeperExt; -import org.apache.hadoop.io.Text; -import org.apache.zookeeper.KeeperException; +import java.util.List; /** * Utility class to extract the list of InputSplits from the @@ -57,12 +59,14 @@ public class InputSplitPathOrganizer implements Iterable<String> { * @param zkPathList the path to read from * @param hostName the worker's host name (for matching) * @param port the port number for this worker + * @param threadId id of the input split thread */ public InputSplitPathOrganizer(final ZooKeeperExt zooKeeper, - final String zkPathList, final String hostName, final int port) + final String zkPathList, final String hostName, final int port, + final int threadId) throws KeeperException, InterruptedException { this(zooKeeper, zooKeeper.getChildrenExt(zkPathList, false, false, true), - hostName, port); + hostName, port, threadId); } /** @@ -72,16 +76,17 @@ public class InputSplitPathOrganizer implements Iterable<String> { * @param inputSplitPathList path of input splits to read from * @param hostName the worker's host name (for matching) * @param port the port number for this worker + * @param threadId id of the input split thread */ public InputSplitPathOrganizer( final ZooKeeperExt zooKeeper, final List<String> inputSplitPathList, - final String hostName, final int port) + final String hostName, final int port, final int threadId) throws KeeperException, InterruptedException { this.zooKeeper = zooKeeper; this.pathList = Lists.newArrayList(inputSplitPathList); this.hostName = hostName; this.baseOffset = 0; // set later after switching out local paths - prioritizeLocalInputSplits(port); + prioritizeLocalInputSplits(port, threadId); } /** @@ -93,10 +98,11 @@ public class InputSplitPathOrganizer implements Iterable<String> { * * @param port the port number for hashing unique iteration indexes for all * workers, even those sharing the same host node. + * @param threadId id of the input split thread */ - private void prioritizeLocalInputSplits(final int port) { + private void prioritizeLocalInputSplits(final int port, final int threadId) { List<String> sortedList = new ArrayList<String>(); - String hosts = null; + String hosts; for (Iterator<String> iterator = pathList.iterator(); iterator.hasNext();) { final String path = iterator.next(); try { @@ -117,9 +123,9 @@ public class InputSplitPathOrganizer implements Iterable<String> { Collections.shuffle(sortedList); // determine the hash-based offset for this worker to iterate from // and place the local blocks into the list at that index, if any - final int temp = hostName.hashCode() + (19 * port); + final int hashOffset = Objects.hashCode(hostName, port, threadId); if (pathList.size() != 0) { - baseOffset = Math.abs(temp % pathList.size()); + baseOffset = Math.abs(hashOffset % pathList.size()); } // re-insert local paths at "adjusted index zero" for caller to iterate on pathList.addAll(baseOffset, sortedList); http://git-wip-us.apache.org/repos/asf/giraph/blob/a5bc5bb3/giraph-core/src/main/java/org/apache/giraph/worker/InputSplitsCallable.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/worker/InputSplitsCallable.java b/giraph-core/src/main/java/org/apache/giraph/worker/InputSplitsCallable.java index ec4780e..0f5cdd4 100644 --- a/giraph-core/src/main/java/org/apache/giraph/worker/InputSplitsCallable.java +++ b/giraph-core/src/main/java/org/apache/giraph/worker/InputSplitsCallable.java @@ -103,6 +103,7 @@ public abstract class InputSplitsCallable<I extends WritableComparable, * @param bspServiceWorker service worker * @param inputSplitPathList List of the paths of the input splits * @param workerInfo This worker's info + * @param threadId Id of input split thread * @param zooKeeperExt Handle to ZooKeeperExt * @param inputSplitReservedNode Path to input split reserved * @param inputSplitFinishedNode Path to input split finsished @@ -115,6 +116,7 @@ public abstract class InputSplitsCallable<I extends WritableComparable, BspServiceWorker<I, V, E, M> bspServiceWorker, List<String> inputSplitPathList, WorkerInfo workerInfo, + int threadId, ZooKeeperExt zooKeeperExt, String inputSplitReservedNode, String inputSplitFinishedNode, @@ -130,7 +132,8 @@ public abstract class InputSplitsCallable<I extends WritableComparable, null); try { splitOrganizer = new InputSplitPathOrganizer(zooKeeperExt, - inputSplitPathList, workerInfo.getHostname(), workerInfo.getPort()); + inputSplitPathList, workerInfo.getHostname(), workerInfo.getPort(), + threadId); } catch (KeeperException e) { throw new IllegalStateException( "InputSplitsCallable: KeeperException", e); http://git-wip-us.apache.org/repos/asf/giraph/blob/a5bc5bb3/giraph-core/src/main/java/org/apache/giraph/worker/InputSplitsCallableFactory.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/worker/InputSplitsCallableFactory.java b/giraph-core/src/main/java/org/apache/giraph/worker/InputSplitsCallableFactory.java index 9e8bc32..cdc6543 100644 --- a/giraph-core/src/main/java/org/apache/giraph/worker/InputSplitsCallableFactory.java +++ b/giraph-core/src/main/java/org/apache/giraph/worker/InputSplitsCallableFactory.java @@ -34,7 +34,8 @@ public interface InputSplitsCallableFactory<I extends WritableComparable, /** * Return a newly-created {@link InputSplitsCallable}. * + * @param threadId Id of input split thread * @return A new {@link InputSplitsCallable} */ - InputSplitsCallable<I, V, E, M> newCallable(); + InputSplitsCallable<I, V, E, M> newCallable(int threadId); } http://git-wip-us.apache.org/repos/asf/giraph/blob/a5bc5bb3/giraph-core/src/main/java/org/apache/giraph/worker/VertexInputSplitsCallable.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/worker/VertexInputSplitsCallable.java b/giraph-core/src/main/java/org/apache/giraph/worker/VertexInputSplitsCallable.java index 83c8b41..7522027 100644 --- a/giraph-core/src/main/java/org/apache/giraph/worker/VertexInputSplitsCallable.java +++ b/giraph-core/src/main/java/org/apache/giraph/worker/VertexInputSplitsCallable.java @@ -82,6 +82,7 @@ public class VertexInputSplitsCallable<I extends WritableComparable, * @param bspServiceWorker service worker * @param inputSplitPathList List of the paths of the input splits * @param workerInfo This worker's info + * @param threadId Id of input split thread * @param zooKeeperExt Handle to ZooKeeperExt */ public VertexInputSplitsCallable( @@ -91,9 +92,10 @@ public class VertexInputSplitsCallable<I extends WritableComparable, BspServiceWorker<I, V, E, M> bspServiceWorker, List<String> inputSplitPathList, WorkerInfo workerInfo, + int threadId, ZooKeeperExt zooKeeperExt) { super(context, graphState, configuration, bspServiceWorker, - inputSplitPathList, workerInfo, zooKeeperExt, + inputSplitPathList, workerInfo, threadId, zooKeeperExt, BspServiceWorker.VERTEX_INPUT_SPLIT_RESERVED_NODE, BspServiceWorker.VERTEX_INPUT_SPLIT_FINISHED_NODE, bspServiceWorker.getVertexInputSplitsEvents()); http://git-wip-us.apache.org/repos/asf/giraph/blob/a5bc5bb3/giraph-core/src/main/java/org/apache/giraph/worker/VertexInputSplitsCallableFactory.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/worker/VertexInputSplitsCallableFactory.java b/giraph-core/src/main/java/org/apache/giraph/worker/VertexInputSplitsCallableFactory.java index 4bec931..0d617dc 100644 --- a/giraph-core/src/main/java/org/apache/giraph/worker/VertexInputSplitsCallableFactory.java +++ b/giraph-core/src/main/java/org/apache/giraph/worker/VertexInputSplitsCallableFactory.java @@ -82,7 +82,7 @@ public class VertexInputSplitsCallableFactory<I extends WritableComparable, } @Override - public InputSplitsCallable<I, V, E, M> newCallable() { + public InputSplitsCallable<I, V, E, M> newCallable(int threadId) { return new VertexInputSplitsCallable<I, V, E, M>( context, graphState, @@ -90,6 +90,7 @@ public class VertexInputSplitsCallableFactory<I extends WritableComparable, bspServiceWorker, inputSplitPathList, workerInfo, + threadId, zooKeeperExt); } } http://git-wip-us.apache.org/repos/asf/giraph/blob/a5bc5bb3/giraph-core/src/test/java/org/apache/giraph/TestBspBasic.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/test/java/org/apache/giraph/TestBspBasic.java b/giraph-core/src/test/java/org/apache/giraph/TestBspBasic.java index 91d536e..56ee5a9 100644 --- a/giraph-core/src/test/java/org/apache/giraph/TestBspBasic.java +++ b/giraph-core/src/test/java/org/apache/giraph/TestBspBasic.java @@ -342,9 +342,9 @@ else[HADOOP_NON_JOBCONTEXT_IS_INTERFACE]*/ public void testInputSplitPathOrganizer() throws IOException, KeeperException, InterruptedException { final List<String> goodList = new ArrayList<String>(); - Collections.addAll(goodList, "good", "bad", "ugly"); + Collections.addAll(goodList, "local", "remote1", "remote2"); final List<String> testList = new ArrayList<String>(); - Collections.addAll(testList, "bad", "good", "ugly"); + Collections.addAll(testList, "remote2", "local", "remote1"); final String localHost = "node.LOCAL.com"; final String testListName = "test_list_parent_znode"; // build output just as we do to store hostlists in ZNODES @@ -352,24 +352,24 @@ else[HADOOP_NON_JOBCONTEXT_IS_INTERFACE]*/ DataOutputStream dos = new DataOutputStream(baos); String last = "node.test4.com\tnode.test5.com\tnode.test6.com"; Text.writeString(dos, last); - byte[] LOCALITY_LAST = baos.toByteArray(); + byte[] remote1 = baos.toByteArray(); baos = new ByteArrayOutputStream(); dos = new DataOutputStream(baos); String middle = "node.test1.com\tnode.test2.com\tnode.test3.com"; Text.writeString(dos, middle); - byte[] LOCALITY_MIDDLE = baos.toByteArray(); + byte[] remote2 = baos.toByteArray(); baos = new ByteArrayOutputStream(); dos = new DataOutputStream(baos); String first = "node.testx.com\tnode.LOCAL.com\tnode.testy.com"; Text.writeString(dos, first); - byte[] LOCALITY_FIRST = baos.toByteArray(); + byte[] local = baos.toByteArray(); ZooKeeperExt zk = mock(ZooKeeperExt.class); when(zk.getChildrenExt(testListName, false, false, true)).thenReturn(testList); - when(zk.getData("ugly", false, null)).thenReturn(LOCALITY_LAST); - when(zk.getData("bad", false, null)).thenReturn(LOCALITY_MIDDLE); - when(zk.getData("good", false, null)).thenReturn(LOCALITY_FIRST); + when(zk.getData("remote1", false, null)).thenReturn(remote1); + when(zk.getData("remote2", false, null)).thenReturn(remote2); + when(zk.getData("local", false, null)).thenReturn(local); InputSplitPathOrganizer lis = - new InputSplitPathOrganizer(zk, testListName, localHost, 0); + new InputSplitPathOrganizer(zk, testListName, localHost, 0, 0); final List<String> resultList = new ArrayList<String>(); for (String next : lis) { resultList.add(next);
