Repository: giraph Updated Branches: refs/heads/trunk f732f300f -> dd47ce6ab
GIRAPH-891: Make MessageStoreFactory configurable (rohankarwa via majakabiljo) Project: http://git-wip-us.apache.org/repos/asf/giraph/repo Commit: http://git-wip-us.apache.org/repos/asf/giraph/commit/dd47ce6a Tree: http://git-wip-us.apache.org/repos/asf/giraph/tree/dd47ce6a Diff: http://git-wip-us.apache.org/repos/asf/giraph/diff/dd47ce6a Branch: refs/heads/trunk Commit: dd47ce6abdf01adc3584d6e7e0d41c6749d02d30 Parents: f732f30 Author: Maja Kabiljo <[email protected]> Authored: Fri May 2 14:50:22 2014 -0700 Committer: Maja Kabiljo <[email protected]> Committed: Fri May 2 14:50:22 2014 -0700 ---------------------------------------------------------------------- CHANGELOG | 2 + .../ByteArrayMessagesPerVertexStore.java | 28 ++++-- .../messages/InMemoryMessageStoreFactory.java | 26 ++++-- .../comm/messages/MessageStoreFactory.java | 22 +++++ .../comm/messages/OneMessagePerVertexStore.java | 28 ++++-- .../out_of_core/DiskBackedMessageStore.java | 51 +--------- .../DiskBackedMessageStoreFactory.java | 97 ++++++++++++++++++++ .../PartitionDiskBackedMessageStore.java | 27 +++++- .../out_of_core/SequentialFileMessageStore.java | 46 +++++++--- .../giraph/comm/netty/NettyWorkerServer.java | 34 ++----- .../org/apache/giraph/conf/GiraphConstants.java | 13 ++- .../giraph/partition/SimplePartition.java | 42 ++++++--- .../apache/giraph/graph/TestVertexAndEdges.java | 2 + .../giraph/jython/TestJythonComputation.java | 19 +++- 14 files changed, 308 insertions(+), 129 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/giraph/blob/dd47ce6a/CHANGELOG ---------------------------------------------------------------------- diff --git a/CHANGELOG b/CHANGELOG index a3412bb..2b4db33 100644 --- a/CHANGELOG +++ b/CHANGELOG @@ -1,6 +1,8 @@ Giraph Change Log Release 1.1.0 - unreleased + GIRAPH-891: Make MessageStoreFactory configurable (rohankarwa via majakabiljo) + GIRAPH-825: Fix DiskBackedPartitionStore to work with current trunk (armax00 via claudio) GIRAPH-864: 'mvn clean test' fails for rexster (armax00 via claudio) http://git-wip-us.apache.org/repos/asf/giraph/blob/dd47ce6a/giraph-core/src/main/java/org/apache/giraph/comm/messages/ByteArrayMessagesPerVertexStore.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/messages/ByteArrayMessagesPerVertexStore.java b/giraph-core/src/main/java/org/apache/giraph/comm/messages/ByteArrayMessagesPerVertexStore.java index 2381078..e8b3b30 100644 --- a/giraph-core/src/main/java/org/apache/giraph/comm/messages/ByteArrayMessagesPerVertexStore.java +++ b/giraph-core/src/main/java/org/apache/giraph/comm/messages/ByteArrayMessagesPerVertexStore.java @@ -18,25 +18,25 @@ package org.apache.giraph.comm.messages; +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.util.concurrent.ConcurrentMap; + import org.apache.giraph.bsp.CentralizedServiceWorker; import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration; import org.apache.giraph.factories.MessageValueFactory; import org.apache.giraph.utils.ByteArrayVertexIdMessages; import org.apache.giraph.utils.ExtendedDataInput; import org.apache.giraph.utils.RepresentativeByteArrayIterator; -import org.apache.giraph.utils.VertexIdIterator; import org.apache.giraph.utils.VerboseByteArrayMessageWrite; +import org.apache.giraph.utils.VertexIdIterator; import org.apache.giraph.utils.io.DataInputOutput; import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.WritableComparable; import com.google.common.collect.Iterators; -import java.io.DataInput; -import java.io.DataOutput; -import java.io.IOException; -import java.util.concurrent.ConcurrentMap; - /** * Implementation of {@link SimpleMessageStore} where multiple messages are * stored per vertex as byte arrays. Used when there is no combiner provided. @@ -202,9 +202,9 @@ public class ByteArrayMessagesPerVertexStore<I extends WritableComparable, private static class Factory<I extends WritableComparable, M extends Writable> implements MessageStoreFactory<I, M, MessageStore<I, M>> { /** Service worker */ - private final CentralizedServiceWorker<I, ?, ?> service; + private CentralizedServiceWorker<I, ?, ?> service; /** Hadoop configuration */ - private final ImmutableClassesGiraphConfiguration<I, ?, ?> config; + private ImmutableClassesGiraphConfiguration<I, ?, ?> config; /** * @param service Worker service @@ -222,5 +222,17 @@ public class ByteArrayMessagesPerVertexStore<I extends WritableComparable, return new ByteArrayMessagesPerVertexStore<I, M>(messageValueFactory, service, config); } + + @Override + public void initialize(CentralizedServiceWorker<I, ?, ?> service, + ImmutableClassesGiraphConfiguration<I, ?, ?> conf) { + this.service = service; + this.config = conf; + } + + @Override + public boolean shouldTraverseMessagesInOrder() { + return false; + } } } http://git-wip-us.apache.org/repos/asf/giraph/blob/dd47ce6a/giraph-core/src/main/java/org/apache/giraph/comm/messages/InMemoryMessageStoreFactory.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/messages/InMemoryMessageStoreFactory.java b/giraph-core/src/main/java/org/apache/giraph/comm/messages/InMemoryMessageStoreFactory.java index 22a41cd..f691d3e 100644 --- a/giraph-core/src/main/java/org/apache/giraph/comm/messages/InMemoryMessageStoreFactory.java +++ b/giraph-core/src/main/java/org/apache/giraph/comm/messages/InMemoryMessageStoreFactory.java @@ -51,18 +51,14 @@ public class InMemoryMessageStoreFactory<I extends WritableComparable, Logger.getLogger(InMemoryMessageStoreFactory.class); /** Service worker */ - private final CentralizedServiceWorker<I, ?, ?> service; + private CentralizedServiceWorker<I, ?, ?> service; /** Hadoop configuration */ - private final ImmutableClassesGiraphConfiguration<I, ?, ?> conf; + private ImmutableClassesGiraphConfiguration<I, ?, ?> conf; /** - * @param service Worker service - * @param conf Configuration + * Default constructor allowing class invocation via Reflection. */ - public InMemoryMessageStoreFactory(CentralizedServiceWorker<I, ?, ?> service, - ImmutableClassesGiraphConfiguration<I, ?, ?> conf) { - this.service = service; - this.conf = conf; + public InMemoryMessageStoreFactory() { } @Override @@ -113,6 +109,18 @@ public class InMemoryMessageStoreFactory<I extends WritableComparable, (conf.useMessageCombiner() ? " message combiner " + conf.getMessageCombinerClass() : " no combiner")); } - return (MessageStore<I, M>) messageStore; + return messageStore; + } + + @Override + public void initialize(CentralizedServiceWorker<I, ?, ?> service, + ImmutableClassesGiraphConfiguration<I, ?, ?> conf) { + this.service = service; + this.conf = conf; + } + + @Override + public boolean shouldTraverseMessagesInOrder() { + return false; } } http://git-wip-us.apache.org/repos/asf/giraph/blob/dd47ce6a/giraph-core/src/main/java/org/apache/giraph/comm/messages/MessageStoreFactory.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/messages/MessageStoreFactory.java b/giraph-core/src/main/java/org/apache/giraph/comm/messages/MessageStoreFactory.java index f582ea2..6149a9c 100644 --- a/giraph-core/src/main/java/org/apache/giraph/comm/messages/MessageStoreFactory.java +++ b/giraph-core/src/main/java/org/apache/giraph/comm/messages/MessageStoreFactory.java @@ -18,6 +18,8 @@ package org.apache.giraph.comm.messages; +import org.apache.giraph.bsp.CentralizedServiceWorker; +import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration; import org.apache.giraph.factories.MessageValueFactory; import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.WritableComparable; @@ -42,4 +44,24 @@ public interface MessageStoreFactory<I extends WritableComparable, * @return New message store */ MS newStore(MessageValueFactory<M> messageValueFactory); + + /** + * Implementation class should use this method of initialization + * of any required internal state. + * + * @param service Service to get partition mappings + * @param conf Configuration + */ + void initialize(CentralizedServiceWorker<I, ?, ?> service, + ImmutableClassesGiraphConfiguration<I, ?, ?> conf); + + /** + * This method is more for the performance optimization. If the message + * traversal would be done in order then data structure which is optimized + * for such traversal can be used. + * + * @return true if the messages would be traversed in order + * else return false + */ + boolean shouldTraverseMessagesInOrder(); } http://git-wip-us.apache.org/repos/asf/giraph/blob/dd47ce6a/giraph-core/src/main/java/org/apache/giraph/comm/messages/OneMessagePerVertexStore.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/messages/OneMessagePerVertexStore.java b/giraph-core/src/main/java/org/apache/giraph/comm/messages/OneMessagePerVertexStore.java index acf68ea..bb581c0 100644 --- a/giraph-core/src/main/java/org/apache/giraph/comm/messages/OneMessagePerVertexStore.java +++ b/giraph-core/src/main/java/org/apache/giraph/comm/messages/OneMessagePerVertexStore.java @@ -18,6 +18,12 @@ package org.apache.giraph.comm.messages; +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.util.Collections; +import java.util.concurrent.ConcurrentMap; + import org.apache.giraph.bsp.CentralizedServiceWorker; import org.apache.giraph.combiner.MessageCombiner; import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration; @@ -26,12 +32,6 @@ import org.apache.giraph.utils.ByteArrayVertexIdMessages; import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.WritableComparable; -import java.io.DataInput; -import java.io.DataOutput; -import java.io.IOException; -import java.util.Collections; -import java.util.concurrent.ConcurrentMap; - /** * Implementation of {@link SimpleMessageStore} where we have a single * message per vertex. @@ -140,9 +140,9 @@ public class OneMessagePerVertexStore<I extends WritableComparable, M extends Writable> implements MessageStoreFactory<I, M, MessageStore<I, M>> { /** Service worker */ - private final CentralizedServiceWorker<I, ?, ?> service; + private CentralizedServiceWorker<I, ?, ?> service; /** Hadoop configuration */ - private final ImmutableClassesGiraphConfiguration<I, ?, ?> config; + private ImmutableClassesGiraphConfiguration<I, ?, ?> config; /** * @param service Worker service @@ -160,5 +160,17 @@ public class OneMessagePerVertexStore<I extends WritableComparable, return new OneMessagePerVertexStore<I, M>(messageValueFactory, service, config.<M>createMessageCombiner(), config); } + + @Override + public void initialize(CentralizedServiceWorker<I, ?, ?> service, + ImmutableClassesGiraphConfiguration<I, ?, ?> conf) { + this.service = service; + this.config = conf; + } + + @Override + public boolean shouldTraverseMessagesInOrder() { + return false; + } } } http://git-wip-us.apache.org/repos/asf/giraph/blob/dd47ce6a/giraph-core/src/main/java/org/apache/giraph/comm/messages/out_of_core/DiskBackedMessageStore.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/messages/out_of_core/DiskBackedMessageStore.java b/giraph-core/src/main/java/org/apache/giraph/comm/messages/out_of_core/DiskBackedMessageStore.java index 346e3b3..1a76306 100644 --- a/giraph-core/src/main/java/org/apache/giraph/comm/messages/out_of_core/DiskBackedMessageStore.java +++ b/giraph-core/src/main/java/org/apache/giraph/comm/messages/out_of_core/DiskBackedMessageStore.java @@ -19,6 +19,7 @@ package org.apache.giraph.comm.messages.out_of_core; import com.google.common.collect.Maps; + import org.apache.giraph.bsp.CentralizedServiceWorker; import org.apache.giraph.comm.messages.MessageStore; import org.apache.giraph.comm.messages.MessageStoreFactory; @@ -27,7 +28,6 @@ import org.apache.giraph.utils.ByteArrayVertexIdMessages; import org.apache.giraph.utils.EmptyIterable; import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.WritableComparable; - import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; @@ -273,54 +273,13 @@ public class DiskBackedMessageStore<I extends WritableComparable, public static <I extends WritableComparable, V extends Writable, E extends Writable, M extends Writable> MessageStoreFactory<I, M, MessageStore<I, M>> newFactory( - CentralizedServiceWorker<I, V, E> service, + CentralizedServiceWorker<I, V, E> service, int maxMessagesInMemory, MessageStoreFactory<I, M, PartitionDiskBackedMessageStore<I, M>> fileStoreFactory) { - return new Factory<I, V, E, M>(service, maxMessagesInMemory, + return new DiskBackedMessageStoreFactory<I, V, E, M>(service, + maxMessagesInMemory, fileStoreFactory); } - - /** - * Factory for {@link DiskBackedMessageStore} - * - * @param <I> Vertex id - * @param <V> Vertex data - * @param <E> Edge data - * @param <M> Message data - */ - private static class Factory<I extends WritableComparable, - V extends Writable, E extends Writable, M extends Writable> - implements MessageStoreFactory<I, M, MessageStore<I, M>> { - /** Service worker */ - private final CentralizedServiceWorker<I, V, E> service; - /** Number of messages to keep in memory */ - private final int maxMessagesInMemory; - /** Factory for creating file stores when flushing */ - private final - MessageStoreFactory<I, M, PartitionDiskBackedMessageStore<I, M>> - fileStoreFactory; - - /** - * @param service Service worker - * @param maxMessagesInMemory Number of messages to keep in memory - * @param fileStoreFactory Factory for creating file stores when - * flushing - */ - public Factory(CentralizedServiceWorker<I, V, E> service, - int maxMessagesInMemory, - MessageStoreFactory<I, M, PartitionDiskBackedMessageStore<I, M>> - fileStoreFactory) { - this.service = service; - this.maxMessagesInMemory = maxMessagesInMemory; - this.fileStoreFactory = fileStoreFactory; - } - - @Override - public MessageStore<I, M> newStore( - MessageValueFactory<M> messageValueFactory) { - return new DiskBackedMessageStore<I, V, E, M>(messageValueFactory, - service, maxMessagesInMemory, fileStoreFactory); - } - } } + http://git-wip-us.apache.org/repos/asf/giraph/blob/dd47ce6a/giraph-core/src/main/java/org/apache/giraph/comm/messages/out_of_core/DiskBackedMessageStoreFactory.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/messages/out_of_core/DiskBackedMessageStoreFactory.java b/giraph-core/src/main/java/org/apache/giraph/comm/messages/out_of_core/DiskBackedMessageStoreFactory.java new file mode 100644 index 0000000..f2b31c0 --- /dev/null +++ b/giraph-core/src/main/java/org/apache/giraph/comm/messages/out_of_core/DiskBackedMessageStoreFactory.java @@ -0,0 +1,97 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.giraph.comm.messages.out_of_core; + +import static org.apache.giraph.conf.GiraphConstants.MAX_MESSAGES_IN_MEMORY; + +import org.apache.giraph.bsp.CentralizedServiceWorker; +import org.apache.giraph.comm.messages.MessageStore; +import org.apache.giraph.comm.messages.MessageStoreFactory; +import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration; +import org.apache.giraph.factories.MessageValueFactory; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.WritableComparable; + +/** + * Message store factory which persist the messages on the disk. + * + * @param <I> vertex id + * @param <V> vertex data + * @param <E> edge data + * @param <M> message data + */ +public class DiskBackedMessageStoreFactory<I extends WritableComparable, + V extends Writable, E extends Writable, M extends Writable> + implements MessageStoreFactory<I, M, MessageStore<I, M>> { + /** Service worker */ + private CentralizedServiceWorker<I, V, E> service; + /** Number of messages to keep in memory */ + private int maxMessagesInMemory; + /** Factory for creating file stores when flushing */ + private MessageStoreFactory<I, M, + PartitionDiskBackedMessageStore<I, M>> fileStoreFactory; + + /** + * Default constructor class helps in class invocation via Reflection + */ + public DiskBackedMessageStoreFactory() { + } + + /** + * @param service Service worker + * @param maxMessagesInMemory Number of messages to keep in memory + * @param fileStoreFactory Factory for creating file stores when flushing + */ + public DiskBackedMessageStoreFactory( + CentralizedServiceWorker<I, V, E> service, + int maxMessagesInMemory, + MessageStoreFactory<I, M, + PartitionDiskBackedMessageStore<I, M>> fileStoreFactory) { + this.service = service; + this.maxMessagesInMemory = maxMessagesInMemory; + this.fileStoreFactory = fileStoreFactory; + } + + @Override + public MessageStore<I, M> + newStore(MessageValueFactory<M> messageValueFactory) { + return new DiskBackedMessageStore<I, V, E, M>(messageValueFactory, + service, maxMessagesInMemory, fileStoreFactory); + } + + @Override + public void initialize(CentralizedServiceWorker service, + ImmutableClassesGiraphConfiguration conf) { + this.maxMessagesInMemory = MAX_MESSAGES_IN_MEMORY.get(conf); + + MessageStoreFactory<I, Writable, SequentialFileMessageStore<I, Writable>> + fileMessageStoreFactory = + SequentialFileMessageStore.newFactory(conf); + this.fileStoreFactory = + PartitionDiskBackedMessageStore.newFactory(conf, + fileMessageStoreFactory); + + this.service = service; + } + + @Override + public boolean shouldTraverseMessagesInOrder() { + return true; + } +} http://git-wip-us.apache.org/repos/asf/giraph/blob/dd47ce6a/giraph-core/src/main/java/org/apache/giraph/comm/messages/out_of_core/PartitionDiskBackedMessageStore.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/messages/out_of_core/PartitionDiskBackedMessageStore.java b/giraph-core/src/main/java/org/apache/giraph/comm/messages/out_of_core/PartitionDiskBackedMessageStore.java index 7d46d30..bece774 100644 --- a/giraph-core/src/main/java/org/apache/giraph/comm/messages/out_of_core/PartitionDiskBackedMessageStore.java +++ b/giraph-core/src/main/java/org/apache/giraph/comm/messages/out_of_core/PartitionDiskBackedMessageStore.java @@ -18,9 +18,6 @@ package org.apache.giraph.comm.messages.out_of_core; -import com.google.common.collect.Iterables; -import com.google.common.collect.Lists; -import com.google.common.collect.Maps; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; @@ -34,6 +31,7 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; +import org.apache.giraph.bsp.CentralizedServiceWorker; import org.apache.giraph.comm.messages.MessageStoreFactory; import org.apache.giraph.comm.messages.MessagesIterable; import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration; @@ -42,6 +40,10 @@ import org.apache.giraph.utils.io.DataInputOutput; import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.WritableComparable; +import com.google.common.collect.Iterables; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; + /** * Message storage with in-memory map of messages and with support for * flushing all the messages to the disk. Holds messages for a single partition. @@ -263,7 +265,7 @@ public class PartitionDiskBackedMessageStore<I extends WritableComparable, // read destination vertices int numVertices = in.readInt(); for (int v = 0; v < numVertices; v++) { - I vertexId = (I) config.createVertexId(); + I vertexId = config.createVertexId(); vertexId.readFields(in); destinationVertices.add(vertexId); } @@ -343,5 +345,22 @@ public class PartitionDiskBackedMessageStore<I extends WritableComparable, return new PartitionDiskBackedMessageStore<I, M>(messageValueFactory, config, fileStoreFactory); } + + @Override + public void initialize(CentralizedServiceWorker<I, ?, ?> service, + ImmutableClassesGiraphConfiguration<I, ?, ?> conf) { + /* Implementation of this method is required if the class is to + * be exposed publicly and allow instantiating the class via the + * configuration parameter MESSAGE_STORE_FACTORY_CLASS. As this is + * a private class, hence the implementation of this method is skipped + * as the caller knows the specific required constructor parameters + * for instantiation. + */ + } + + @Override + public boolean shouldTraverseMessagesInOrder() { + return true; + } } } http://git-wip-us.apache.org/repos/asf/giraph/blob/dd47ce6a/giraph-core/src/main/java/org/apache/giraph/comm/messages/out_of_core/SequentialFileMessageStore.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/messages/out_of_core/SequentialFileMessageStore.java b/giraph-core/src/main/java/org/apache/giraph/comm/messages/out_of_core/SequentialFileMessageStore.java index 51c05da..5988459 100644 --- a/giraph-core/src/main/java/org/apache/giraph/comm/messages/out_of_core/SequentialFileMessageStore.java +++ b/giraph-core/src/main/java/org/apache/giraph/comm/messages/out_of_core/SequentialFileMessageStore.java @@ -18,19 +18,7 @@ package org.apache.giraph.comm.messages.out_of_core; -import org.apache.giraph.comm.messages.MessageStoreFactory; -import org.apache.giraph.comm.messages.MessagesIterable; -import org.apache.giraph.conf.GiraphConstants; -import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration; -import org.apache.giraph.factories.MessageValueFactory; -import org.apache.giraph.utils.EmptyIterable; -import org.apache.giraph.utils.io.DataInputOutput; -import org.apache.hadoop.io.Writable; -import org.apache.hadoop.io.WritableComparable; -import org.apache.log4j.Logger; - -import com.google.common.collect.Iterables; -import com.google.common.collect.Lists; +import static org.apache.giraph.conf.GiraphConstants.MESSAGES_DIRECTORY; import java.io.BufferedInputStream; import java.io.BufferedOutputStream; @@ -49,7 +37,20 @@ import java.util.Map; import java.util.NavigableMap; import java.util.concurrent.atomic.AtomicInteger; -import static org.apache.giraph.conf.GiraphConstants.MESSAGES_DIRECTORY; +import org.apache.giraph.bsp.CentralizedServiceWorker; +import org.apache.giraph.comm.messages.MessageStoreFactory; +import org.apache.giraph.comm.messages.MessagesIterable; +import org.apache.giraph.conf.GiraphConstants; +import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration; +import org.apache.giraph.factories.MessageValueFactory; +import org.apache.giraph.utils.EmptyIterable; +import org.apache.giraph.utils.io.DataInputOutput; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.WritableComparable; +import org.apache.log4j.Logger; + +import com.google.common.collect.Iterables; +import com.google.common.collect.Lists; /** * Used for writing and reading collection of messages to the disk. @@ -413,5 +414,22 @@ public class SequentialFileMessageStore<I extends WritableComparable, return new SequentialFileMessageStore<I, M>(messageValueFactory, config, bufferSize, fileName); } + + @Override + public void initialize(CentralizedServiceWorker<I, ?, ?> service, + ImmutableClassesGiraphConfiguration<I, ?, ?> conf) { + /* Implementation of this method is required if the class is to + * be exposed publicly and allow instantiating the class via the + * configuration parameter MESSAGE_STORE_FACTORY_CLASS. As this is + * a private class, hence the implementation of this method is skipped + * as the caller knows the specific required constructor parameters + * for instantiation. + */ + } + + @Override + public boolean shouldTraverseMessagesInOrder() { + return true; + } } } http://git-wip-us.apache.org/repos/asf/giraph/blob/dd47ce6a/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyWorkerServer.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyWorkerServer.java b/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyWorkerServer.java index 4f6c17b..adb96cb 100644 --- a/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyWorkerServer.java +++ b/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyWorkerServer.java @@ -21,18 +21,15 @@ package org.apache.giraph.comm.netty; import org.apache.giraph.bsp.CentralizedServiceWorker; import org.apache.giraph.comm.ServerData; import org.apache.giraph.comm.WorkerServer; -import org.apache.giraph.comm.messages.out_of_core.DiskBackedMessageStore; -import org.apache.giraph.comm.messages.InMemoryMessageStoreFactory; import org.apache.giraph.comm.messages.MessageStore; import org.apache.giraph.comm.messages.MessageStoreFactory; -import org.apache.giraph.comm.messages.out_of_core.PartitionDiskBackedMessageStore; -import org.apache.giraph.comm.messages.out_of_core.SequentialFileMessageStore; import org.apache.giraph.comm.netty.handler.WorkerRequestServerHandler; import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration; import org.apache.giraph.graph.Vertex; import org.apache.giraph.graph.VertexMutations; import org.apache.giraph.graph.VertexResolver; import org.apache.giraph.partition.Partition; +import org.apache.giraph.utils.ReflectionUtils; import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.mapreduce.Mapper; @@ -46,8 +43,7 @@ import java.net.InetSocketAddress; import java.util.Collection; import java.util.Map.Entry; -import static org.apache.giraph.conf.GiraphConstants.MAX_MESSAGES_IN_MEMORY; -import static org.apache.giraph.conf.GiraphConstants.USE_OUT_OF_CORE_MESSAGES; +import static org.apache.giraph.conf.GiraphConstants.MESSAGE_STORE_FACTORY_CLASS; /** * Netty worker server that implement {@link WorkerServer} and contains @@ -107,24 +103,14 @@ public class NettyWorkerServer<I extends WritableComparable, */ private MessageStoreFactory<I, Writable, MessageStore<I, Writable>> createMessageStoreFactory() { - boolean useOutOfCoreMessaging = USE_OUT_OF_CORE_MESSAGES.get(conf); - if (!useOutOfCoreMessaging) { - return new InMemoryMessageStoreFactory<I, Writable>(service, conf); - } else { - int maxMessagesInMemory = MAX_MESSAGES_IN_MEMORY.get(conf); - if (LOG.isInfoEnabled()) { - LOG.info("createMessageStoreFactory: Using DiskBackedMessageStore, " + - "maxMessagesInMemory = " + maxMessagesInMemory); - } - MessageStoreFactory<I, Writable, SequentialFileMessageStore<I, Writable>> - fileStoreFactory = SequentialFileMessageStore.newFactory(conf); - MessageStoreFactory<I, Writable, - PartitionDiskBackedMessageStore<I, Writable>> - partitionStoreFactory = - PartitionDiskBackedMessageStore.newFactory(conf, fileStoreFactory); - return DiskBackedMessageStore.newFactory(service, - maxMessagesInMemory, partitionStoreFactory); - } + Class<? extends MessageStoreFactory> messageStoreFactoryClass = + MESSAGE_STORE_FACTORY_CLASS.get(conf); + + MessageStoreFactory messageStoreFactoryInstance = + ReflectionUtils.newInstance(messageStoreFactoryClass); + messageStoreFactoryInstance.initialize(service, conf); + + return messageStoreFactoryInstance; } @Override http://git-wip-us.apache.org/repos/asf/giraph/blob/dd47ce6a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java index e257b4a..e791d62 100644 --- a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java +++ b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java @@ -20,6 +20,8 @@ package org.apache.giraph.conf; import org.apache.giraph.aggregators.AggregatorWriter; import org.apache.giraph.aggregators.TextAggregatorWriter; import org.apache.giraph.combiner.MessageCombiner; +import org.apache.giraph.comm.messages.InMemoryMessageStoreFactory; +import org.apache.giraph.comm.messages.MessageStoreFactory; import org.apache.giraph.edge.ByteArrayEdges; import org.apache.giraph.edge.OutEdges; import org.apache.giraph.factories.ComputationFactory; @@ -92,6 +94,13 @@ public interface GiraphConstants { TypesHolder.class, "TypesHolder, used if Computation not set - optional"); + /** Message Store Factory */ + ClassConfOption<MessageStoreFactory> MESSAGE_STORE_FACTORY_CLASS = + ClassConfOption.create("giraph.messageStoreFactoryClass", + InMemoryMessageStoreFactory.class, + MessageStoreFactory.class, + "Message Store Factory Class that is to be used"); + /** Language user's graph types are implemented in */ PerGraphTypeEnumConfOption<Language> GRAPH_TYPE_LANGUAGES = PerGraphTypeEnumConfOption.create("giraph.types.language", @@ -833,10 +842,6 @@ public interface GiraphConstants { "Comma-separated list of directories in the local file system for " + "out-of-core messages."); - /** Whether or not to use out-of-core messages */ - BooleanConfOption USE_OUT_OF_CORE_MESSAGES = - new BooleanConfOption("giraph.useOutOfCoreMessages", false, - "Whether or not to use out-of-core messages"); /** * If using out-of-core messaging, it tells how much messages do we keep * in memory. http://git-wip-us.apache.org/repos/asf/giraph/blob/dd47ce6a/giraph-core/src/main/java/org/apache/giraph/partition/SimplePartition.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/partition/SimplePartition.java b/giraph-core/src/main/java/org/apache/giraph/partition/SimplePartition.java index 1609846..de2ffd4 100644 --- a/giraph-core/src/main/java/org/apache/giraph/partition/SimplePartition.java +++ b/giraph-core/src/main/java/org/apache/giraph/partition/SimplePartition.java @@ -18,15 +18,8 @@ package org.apache.giraph.partition; -import com.google.common.collect.Maps; -import org.apache.giraph.edge.Edge; -import org.apache.giraph.graph.Vertex; -import org.apache.giraph.utils.WritableUtils; -import org.apache.hadoop.io.Writable; -import org.apache.hadoop.io.WritableComparable; -import org.apache.hadoop.util.Progressable; +import static org.apache.giraph.conf.GiraphConstants.MESSAGE_STORE_FACTORY_CLASS; -import javax.annotation.concurrent.ThreadSafe; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; @@ -34,7 +27,18 @@ import java.util.Iterator; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ConcurrentSkipListMap; -import static org.apache.giraph.conf.GiraphConstants.USE_OUT_OF_CORE_MESSAGES; +import javax.annotation.concurrent.ThreadSafe; + +import org.apache.giraph.comm.messages.MessageStoreFactory; +import org.apache.giraph.edge.Edge; +import org.apache.giraph.graph.Vertex; +import org.apache.giraph.utils.ReflectionUtils; +import org.apache.giraph.utils.WritableUtils; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.WritableComparable; +import org.apache.hadoop.util.Progressable; + +import com.google.common.collect.Maps; /** * A simple map-based container that stores vertices. Vertex ids will map to @@ -60,7 +64,7 @@ public class SimplePartition<I extends WritableComparable, @Override public void initialize(int partitionId, Progressable progressable) { super.initialize(partitionId, progressable); - if (USE_OUT_OF_CORE_MESSAGES.get(getConf())) { + if (shouldTraverseMessageInOrder()) { vertexMap = new ConcurrentSkipListMap<I, Vertex<I, V, E>>(); } else { vertexMap = Maps.newConcurrentMap(); @@ -141,7 +145,7 @@ public class SimplePartition<I extends WritableComparable, @Override public void readFields(DataInput input) throws IOException { super.readFields(input); - if (USE_OUT_OF_CORE_MESSAGES.get(getConf())) { + if (shouldTraverseMessageInOrder()) { vertexMap = new ConcurrentSkipListMap<I, Vertex<I, V, E>>(); } else { vertexMap = Maps.newConcurrentMap(); @@ -173,4 +177,20 @@ public class SimplePartition<I extends WritableComparable, public Iterator<Vertex<I, V, E>> iterator() { return vertexMap.values().iterator(); } + + /** + * This method specifies if the message store factory, that is been + * configured, has requirement of traversing messages in order. + * + * @return true if the message store factory has specified traversing + * messages in ordered, else return false. + */ + private boolean shouldTraverseMessageInOrder() { + Class<? extends MessageStoreFactory> messageStoreFactoryClass = + MESSAGE_STORE_FACTORY_CLASS.get(getConf()); + + MessageStoreFactory messageStoreFactoryInstance = + ReflectionUtils.newInstance(messageStoreFactoryClass); + return messageStoreFactoryInstance.shouldTraverseMessagesInOrder(); + } } http://git-wip-us.apache.org/repos/asf/giraph/blob/dd47ce6a/giraph-core/src/test/java/org/apache/giraph/graph/TestVertexAndEdges.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/test/java/org/apache/giraph/graph/TestVertexAndEdges.java b/giraph-core/src/test/java/org/apache/giraph/graph/TestVertexAndEdges.java index b404646..86d75a3 100644 --- a/giraph-core/src/test/java/org/apache/giraph/graph/TestVertexAndEdges.java +++ b/giraph-core/src/test/java/org/apache/giraph/graph/TestVertexAndEdges.java @@ -19,6 +19,8 @@ package org.apache.giraph.graph; import com.google.common.collect.Lists; +import org.apache.giraph.comm.messages.MessageStoreFactory; +import org.apache.giraph.comm.messages.out_of_core.DiskBackedMessageStoreFactory; import org.apache.giraph.conf.GiraphConfiguration; import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration; import org.apache.giraph.edge.ArrayListEdges; http://git-wip-us.apache.org/repos/asf/giraph/blob/dd47ce6a/giraph-core/src/test/java/org/apache/giraph/jython/TestJythonComputation.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/test/java/org/apache/giraph/jython/TestJythonComputation.java b/giraph-core/src/test/java/org/apache/giraph/jython/TestJythonComputation.java index 5feabaf..49a338c 100644 --- a/giraph-core/src/test/java/org/apache/giraph/jython/TestJythonComputation.java +++ b/giraph-core/src/test/java/org/apache/giraph/jython/TestJythonComputation.java @@ -17,7 +17,11 @@ */ package org.apache.giraph.jython; +import org.apache.giraph.comm.messages.InMemoryMessageStoreFactory; +import org.apache.giraph.comm.messages.MessageStoreFactory; +import org.apache.giraph.comm.messages.out_of_core.DiskBackedMessageStoreFactory; import org.apache.giraph.conf.GiraphConfiguration; +import org.apache.giraph.conf.GiraphConstants; import org.apache.giraph.conf.GiraphTypes; import org.apache.giraph.edge.ByteArrayEdges; import org.apache.giraph.graph.Language; @@ -37,8 +41,19 @@ import java.util.Map; import static org.junit.Assert.assertEquals; public class TestJythonComputation { + + @Test + public void testCountEdgesDiskBackedMessageStoreFactory() throws Exception { + testCountEdges(DiskBackedMessageStoreFactory.class); + } + @Test - public void testCountEdges() throws Exception { + public void testCountEdgesInMemoryMessageStoreFactory() throws Exception { + testCountEdges(InMemoryMessageStoreFactory.class); + } + + public void testCountEdges(Class<? extends MessageStoreFactory> + messageStoreFactoryClass) throws Exception { String[] edges = new String[] { "1 2", "2 3", @@ -57,6 +72,8 @@ public class TestJythonComputation { conf.setOutEdgesClass(ByteArrayEdges.class); conf.setEdgeInputFormatClass(IntNullTextEdgeInputFormat.class); conf.setVertexOutputFormatClass(IdWithValueTextOutputFormat.class); + GiraphConstants.MESSAGE_STORE_FACTORY_CLASS.set(conf, + messageStoreFactoryClass); Iterable<String> results = InternalVertexRunner.run(conf, null, edges); Map<Integer, Integer> values = parseResults(results);
