Author: edwardyoon
Date: Wed Jan 8 07:21:58 2014
New Revision: 1556453
URL: http://svn.apache.org/r1556453
Log:
HAMA-837: Add sort behaviour to runtime partitioner
Added:
hama/trunk/graph/src/main/java/org/apache/hama/graph/MapVerticesInfo.java
Modified:
hama/trunk/CHANGES.txt
hama/trunk/core/src/main/java/org/apache/hama/Constants.java
hama/trunk/core/src/main/java/org/apache/hama/bsp/PartitioningRunner.java
hama/trunk/examples/src/main/java/org/apache/hama/examples/DynamicGraph.java
hama/trunk/examples/src/test/java/org/apache/hama/examples/DynamicGraphTest.java
hama/trunk/examples/src/test/java/org/apache/hama/examples/MindistSearchTest.java
hama/trunk/graph/src/main/java/org/apache/hama/graph/DiskVerticesInfo.java
hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJob.java
hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java
hama/trunk/graph/src/main/java/org/apache/hama/graph/ListVerticesInfo.java
hama/trunk/graph/src/main/java/org/apache/hama/graph/Vertex.java
hama/trunk/graph/src/main/java/org/apache/hama/graph/VertexInputReader.java
hama/trunk/ml/src/main/java/org/apache/hama/ml/semiclustering/SemiClusteringVertex.java
Modified: hama/trunk/CHANGES.txt
URL:
http://svn.apache.org/viewvc/hama/trunk/CHANGES.txt?rev=1556453&r1=1556452&r2=1556453&view=diff
==============================================================================
--- hama/trunk/CHANGES.txt (original)
+++ hama/trunk/CHANGES.txt Wed Jan 8 07:21:58 2014
@@ -3,7 +3,8 @@ Hama Change Log
Release 0.7.0 (unreleased changes)
NEW FEATURES
-
+
+ HAMA-837: Add sort behaviour to runtime partitioner (edwardyoon)
HAMA-827: Add NamedVector (edwardyoon)
HAMA-822: Add feature transformer interface to improve the power and
flexibility of existing machine learning model (Yexi Jiang)
HAMA-774: CompositeInputFormat in Hama (Martin Illecker)
Modified: hama/trunk/core/src/main/java/org/apache/hama/Constants.java
URL:
http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/Constants.java?rev=1556453&r1=1556452&r2=1556453&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/Constants.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/Constants.java Wed Jan 8
07:21:58 2014
@@ -120,6 +120,8 @@ public interface Constants {
public static final String RUNTIME_PARTITIONING_CLASS =
"bsp.input.partitioner.class";
public static final String RUNTIME_DESIRED_PEERS_COUNT =
"desired.num.of.tasks";
public static final String RUNTIME_PARTITION_RECORDCONVERTER =
"bsp.runtime.partition.recordconverter";
+
+ public static final String PARTITION_SORT_BY_KEY =
"bsp.partition.sort.by.converted.record";
// /////////////////////////////////////
// Constants for ZooKeeper
Modified:
hama/trunk/core/src/main/java/org/apache/hama/bsp/PartitioningRunner.java
URL:
http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/PartitioningRunner.java?rev=1556453&r1=1556452&r2=1556453&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/PartitioningRunner.java
(original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/PartitioningRunner.java
Wed Jan 8 07:21:58 2014
@@ -20,9 +20,10 @@ package org.apache.hama.bsp;
import java.io.IOException;
import java.lang.reflect.Constructor;
import java.util.HashMap;
-import java.util.LinkedList;
-import java.util.List;
import java.util.Map;
+import java.util.Map.Entry;
+import java.util.SortedMap;
+import java.util.TreeMap;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -30,10 +31,12 @@ import org.apache.hadoop.conf.Configurat
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.SequenceFile.CompressionType;
import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hama.Constants;
import org.apache.hama.bsp.sync.SyncException;
@@ -49,7 +52,6 @@ public class PartitioningRunner extends
private FileSystem fs = null;
private Path partitionDir;
private RecordConverter converter;
- private Map<Integer, LinkedList<KeyValuePair<Writable, Writable>>> values =
new HashMap<Integer, LinkedList<KeyValuePair<Writable, Writable>>>();
private PipesPartitioner<?, ?> pipesPartitioner = null;
@Override
@@ -72,7 +74,6 @@ public class PartitioningRunner extends
} else {
this.partitionDir = new
Path(conf.get(Constants.RUNTIME_PARTITIONING_DIR));
}
-
}
/**
@@ -97,20 +98,10 @@ public class PartitioningRunner extends
KeyValuePair<Writable, Writable> inputRecord, Configuration conf);
public int getPartitionId(KeyValuePair<Writable, Writable> inputRecord,
- @SuppressWarnings("rawtypes") Partitioner partitioner,
- Configuration conf, @SuppressWarnings("rawtypes") BSPPeer peer,
- int numTasks);
-
- /**
- * @return a map implementation, so order can be changed in subclasses if
- * needed.
- */
- public Map<Writable, Writable> newMap();
-
- /**
- * @return a list implementation, so order will not be changed in
subclasses
- */
- public List<KeyValuePair<Writable, Writable>> newList();
+ @SuppressWarnings("rawtypes")
+ Partitioner partitioner, Configuration conf,
+ @SuppressWarnings("rawtypes")
+ BSPPeer peer, int numTasks);
}
/**
@@ -127,34 +118,32 @@ public class PartitioningRunner extends
@SuppressWarnings("unchecked")
@Override
public int getPartitionId(KeyValuePair<Writable, Writable> outputRecord,
- @SuppressWarnings("rawtypes") Partitioner partitioner,
- Configuration conf, @SuppressWarnings("rawtypes") BSPPeer peer,
- int numTasks) {
+ @SuppressWarnings("rawtypes")
+ Partitioner partitioner, Configuration conf,
+ @SuppressWarnings("rawtypes")
+ BSPPeer peer, int numTasks) {
return Math.abs(partitioner.getPartition(outputRecord.getKey(),
outputRecord.getValue(), numTasks));
}
@Override
public void setup(Configuration conf) {
-
- }
-
- @Override
- public Map<Writable, Writable> newMap() {
- return new HashMap<Writable, Writable>();
}
- @Override
- public List<KeyValuePair<Writable, Writable>> newList() {
- return new LinkedList<KeyValuePair<Writable, Writable>>();
- }
}
+ public Map<Integer, SequenceFile.Writer> writerCache = new HashMap<Integer,
SequenceFile.Writer>();
+
+ @SuppressWarnings("rawtypes")
+ public SortedMap<WritableComparable, KeyValuePair<IntWritable,
KeyValuePair>> sortedMap = new TreeMap<WritableComparable,
KeyValuePair<IntWritable, KeyValuePair>>();
+
@Override
@SuppressWarnings({ "rawtypes", "unchecked" })
public void bsp(
BSPPeer<Writable, Writable, Writable, Writable, NullWritable> peer)
throws IOException, SyncException, InterruptedException {
+
+ int peerNum = peer.getNumPeers();
Partitioner partitioner = getPartitioner();
KeyValuePair<Writable, Writable> pair = null;
KeyValuePair<Writable, Writable> outputPair = null;
@@ -166,7 +155,6 @@ public class PartitioningRunner extends
keyClass = pair.getKey().getClass();
valueClass = pair.getValue().getClass();
}
-
outputPair = converter.convertRecord(pair, conf);
if (outputPair == null) {
@@ -176,74 +164,153 @@ public class PartitioningRunner extends
int index = converter.getPartitionId(outputPair, partitioner, conf, peer,
desiredNum);
- LinkedList<KeyValuePair<Writable, Writable>> list = values.get(index);
- if (list == null) {
- list = (LinkedList<KeyValuePair<Writable, Writable>>) converter
- .newList();
- values.put(index, list);
+ // if key is comparable and it need to be sorted by key,
+ if (outputPair.getKey() instanceof WritableComparable
+ && conf.getBoolean(Constants.PARTITION_SORT_BY_KEY, false)) {
+ sortedMap.put(
+ (WritableComparable) outputPair.getKey(),
+ new KeyValuePair(new IntWritable(index), new KeyValuePair(pair
+ .getKey(), pair.getValue())));
+ } else {
+ if (!writerCache.containsKey(index)) {
+ Path destFile = new Path(partitionDir + "/part-" + index + "/file-"
+ + peer.getPeerIndex());
+ SequenceFile.Writer writer = SequenceFile.createWriter(fs, conf,
+ destFile, keyClass, valueClass, CompressionType.NONE);
+ writerCache.put(index, writer);
+ }
+
+ writerCache.get(index).append(pair.getKey(), pair.getValue());
}
- list.add(new KeyValuePair<Writable, Writable>(pair.getKey(), pair
- .getValue()));
}
- // The reason of use of Memory is to reduce file opens
- for (Map.Entry<Integer, LinkedList<KeyValuePair<Writable, Writable>>> e :
values
- .entrySet()) {
- Path destFile = new Path(partitionDir + "/part-" + e.getKey() + "/file-"
- + peer.getPeerIndex());
- SequenceFile.Writer writer = SequenceFile.createWriter(fs, conf,
- destFile, keyClass, valueClass, CompressionType.NONE);
+ if (sortedMap.size() > 0) {
+ writeSortedFile(peer.getPeerIndex(), keyClass, valueClass);
+ }
- for (KeyValuePair<Writable, Writable> v : e.getValue()) {
- writer.append(v.getKey(), v.getValue());
- }
- writer.close();
+ for (SequenceFile.Writer w : writerCache.values()) {
+ w.close();
}
peer.sync();
FileStatus[] status = fs.listStatus(partitionDir);
- // To avoid race condition, we should store the peer number
- int peerNum = peer.getNumPeers();
// Call sync() one more time to avoid concurrent access
peer.sync();
- // merge files into one.
- // TODO if we use header info, we might able to merge files without full
- // scan.
for (FileStatus stat : status) {
int partitionID = Integer
.parseInt(stat.getPath().getName().split("[-]")[1]);
- // TODO set replica factor to 1.
if (getMergeProcessorID(partitionID, peerNum) == peer.getPeerIndex()) {
- Path partitionFile = new Path(partitionDir + "/"
+ Path destinationFilePath = new Path(partitionDir + "/"
+ getPartitionName(partitionID));
FileStatus[] files = fs.listStatus(stat.getPath());
+ if (outputPair.getKey() instanceof WritableComparable
+ && conf.getBoolean(Constants.PARTITION_SORT_BY_KEY, false)) {
+ mergeSortedFiles(files, destinationFilePath, keyClass, valueClass);
+ } else {
+ mergeFiles(files, destinationFilePath, keyClass, valueClass);
+ }
+ fs.delete(stat.getPath(), true);
+ }
+ }
+ }
+
+ @SuppressWarnings("rawtypes")
+ private void writeSortedFile(int peerIndex, Class keyClass, Class valueClass)
+ throws IOException {
+ for (Entry<WritableComparable, KeyValuePair<IntWritable, KeyValuePair>> e
: sortedMap
+ .entrySet()) {
+ int index = ((IntWritable) e.getValue().getKey()).get();
+ KeyValuePair rawRecord = e.getValue().getValue();
+
+ if (!writerCache.containsKey(index)) {
+ Path destFile = new Path(partitionDir + "/part-" + index + "/file-"
+ + peerIndex);
SequenceFile.Writer writer = SequenceFile.createWriter(fs, conf,
- partitionFile, keyClass, valueClass, CompressionType.NONE);
+ destFile, keyClass, valueClass, CompressionType.NONE);
+ writerCache.put(index, writer);
+ }
- for (int i = 0; i < files.length; i++) {
- LOG.debug("merge '" + files[i].getPath() + "' into " + partitionDir
- + "/" + getPartitionName(partitionID));
-
- SequenceFile.Reader reader = new SequenceFile.Reader(fs,
- files[i].getPath(), conf);
-
- Writable key = (Writable) ReflectionUtils.newInstance(keyClass,
conf);
- Writable value = (Writable) ReflectionUtils.newInstance(valueClass,
- conf);
-
- while (reader.next(key, value)) {
- writer.append(key, value);
- }
- reader.close();
- }
+ writerCache.get(index).append(rawRecord.getKey(), rawRecord.getValue());
+ }
- writer.close();
- fs.delete(stat.getPath(), true);
+ sortedMap.clear();
+ }
+
+ @SuppressWarnings({ "rawtypes", "unchecked" })
+ private void mergeSortedFiles(FileStatus[] status, Path destinationFilePath,
+ Class keyClass, Class valueClass) throws IOException {
+ SequenceFile.Writer writer = SequenceFile.createWriter(fs, conf,
+ destinationFilePath, keyClass, valueClass, CompressionType.NONE);
+ KeyValuePair outputPair = null;
+ Writable key;
+ Writable value;
+
+ Map<Integer, SequenceFile.Reader> readers = new HashMap<Integer,
SequenceFile.Reader>();
+ for (int i = 0; i < status.length; i++) {
+ readers.put(i, new SequenceFile.Reader(fs, status[i].getPath(), conf));
+ }
+
+ for (int i = 0; i < readers.size(); i++) {
+ key = (Writable) ReflectionUtils.newInstance(keyClass, conf);
+ value = (Writable) ReflectionUtils.newInstance(valueClass, conf);
+
+ readers.get(i).next(key, value);
+ KeyValuePair record = new KeyValuePair(key, value);
+ outputPair = converter.convertRecord(record, conf);
+ sortedMap.put((WritableComparable) outputPair.getKey(), new KeyValuePair(
+ new IntWritable(i), record));
+ }
+
+ while (readers.size() > 0) {
+ key = (Writable) ReflectionUtils.newInstance(keyClass, conf);
+ value = (Writable) ReflectionUtils.newInstance(valueClass, conf);
+
+ WritableComparable firstKey = sortedMap.firstKey();
+ KeyValuePair kv = sortedMap.get(firstKey);
+ int readerIndex = ((IntWritable) kv.getKey()).get();
+ KeyValuePair rawRecord = (KeyValuePair) kv.getValue();
+ writer.append(rawRecord.getKey(), rawRecord.getValue());
+
+ sortedMap.remove(firstKey);
+
+ if (readers.get(readerIndex).next(key, value)) {
+ KeyValuePair record = new KeyValuePair(key, value);
+ outputPair = converter.convertRecord(record, conf);
+ sortedMap.put((WritableComparable) outputPair.getKey(),
+ new KeyValuePair(new IntWritable(readerIndex), record));
+ } else {
+ readers.get(readerIndex).close();
+ readers.remove(readerIndex);
+ }
+ }
+
+ sortedMap.clear();
+ writer.close();
+ }
+
+ @SuppressWarnings({ "rawtypes", "unchecked" })
+ private void mergeFiles(FileStatus[] status, Path destinationFilePath,
+ Class keyClass, Class valueClass) throws IOException {
+ SequenceFile.Writer writer = SequenceFile.createWriter(fs, conf,
+ destinationFilePath, keyClass, valueClass, CompressionType.NONE);
+ Writable key;
+ Writable value;
+
+ for (int i = 0; i < status.length; i++) {
+ SequenceFile.Reader reader = new SequenceFile.Reader(fs,
+ status[i].getPath(), conf);
+ key = (Writable) ReflectionUtils.newInstance(keyClass, conf);
+ value = (Writable) ReflectionUtils.newInstance(valueClass, conf);
+
+ while (reader.next(key, value)) {
+ writer.append(key, value);
}
+ reader.close();
}
+ writer.close();
}
@Override
Modified:
hama/trunk/examples/src/main/java/org/apache/hama/examples/DynamicGraph.java
URL:
http://svn.apache.org/viewvc/hama/trunk/examples/src/main/java/org/apache/hama/examples/DynamicGraph.java?rev=1556453&r1=1556452&r2=1556453&view=diff
==============================================================================
---
hama/trunk/examples/src/main/java/org/apache/hama/examples/DynamicGraph.java
(original)
+++
hama/trunk/examples/src/main/java/org/apache/hama/examples/DynamicGraph.java
Wed Jan 8 07:21:58 2014
@@ -33,10 +33,13 @@ import org.apache.hama.bsp.TextOutputFor
import org.apache.hama.graph.Edge;
import org.apache.hama.graph.GraphJob;
import org.apache.hama.graph.GraphJobRunner.GraphJobCounter;
+import org.apache.hama.graph.MapVerticesInfo;
import org.apache.hama.graph.Vertex;
import org.apache.hama.graph.VertexInputReader;
/**
+ * NOTE: Graph modification APIs can be used only with {@link MapVerticesInfo}.
+ *
* This is an example of how to manipulate Graphs dynamically. The input of
this
* example is a number in each row. We assume that the is a vertex with ID:1
* which is responsible to create a sum vertex that will aggregate the values
of
@@ -134,9 +137,9 @@ public class DynamicGraph {
private static GraphJob createJob(String[] args, HamaConfiguration conf)
throws IOException {
- // NOTE Graph modification APIs can be used only with in-memory vertices
storage.
+ // NOTE: Graph modification APIs can be used only with MapVerticesInfo.
conf.set("hama.graph.vertices.info",
- "org.apache.hama.graph.ListVerticesInfo");
+ "org.apache.hama.graph.MapVerticesInfo");
GraphJob graphJob = new GraphJob(conf, DynamicGraph.class);
graphJob.setJobName("Dynamic Graph");
Modified:
hama/trunk/examples/src/test/java/org/apache/hama/examples/DynamicGraphTest.java
URL:
http://svn.apache.org/viewvc/hama/trunk/examples/src/test/java/org/apache/hama/examples/DynamicGraphTest.java?rev=1556453&r1=1556452&r2=1556453&view=diff
==============================================================================
---
hama/trunk/examples/src/test/java/org/apache/hama/examples/DynamicGraphTest.java
(original)
+++
hama/trunk/examples/src/test/java/org/apache/hama/examples/DynamicGraphTest.java
Wed Jan 8 07:21:58 2014
@@ -31,13 +31,13 @@ import org.apache.hama.HamaConfiguration
import org.junit.Test;
/**
- * Testcase for {@link org.apache.hama.examples.DynamicGraph}
+ * Unit test for {@link org.apache.hama.examples.DynamicGraph}
*/
public class DynamicGraphTest extends TestCase {
private static String OUTPUT = "/tmp/page-out";
private Configuration conf = new HamaConfiguration();
private FileSystem fs;
-
+
private void deleteTempDirs() {
try {
if (fs.exists(new Path(OUTPUT)))
@@ -61,7 +61,7 @@ public class DynamicGraphTest extends Te
}
}
}
-
+
@Override
protected void setUp() throws Exception {
super.setUp();
@@ -69,9 +69,10 @@ public class DynamicGraphTest extends Te
}
@Test
- public void test() throws IOException, InterruptedException,
ClassNotFoundException {
+ public void test() throws IOException, InterruptedException,
+ ClassNotFoundException {
try {
- DynamicGraph.main(new String[] {"src/test/resources/dg.txt", OUTPUT });
+ DynamicGraph.main(new String[] { "src/test/resources/dg.txt", OUTPUT });
verifyResult();
} finally {
deleteTempDirs();
Modified:
hama/trunk/examples/src/test/java/org/apache/hama/examples/MindistSearchTest.java
URL:
http://svn.apache.org/viewvc/hama/trunk/examples/src/test/java/org/apache/hama/examples/MindistSearchTest.java?rev=1556453&r1=1556452&r2=1556453&view=diff
==============================================================================
---
hama/trunk/examples/src/test/java/org/apache/hama/examples/MindistSearchTest.java
(original)
+++
hama/trunk/examples/src/test/java/org/apache/hama/examples/MindistSearchTest.java
Wed Jan 8 07:21:58 2014
@@ -132,5 +132,4 @@ public class MindistSearchTest extends T
e.printStackTrace();
}
}
-
-}
+}
\ No newline at end of file
Modified:
hama/trunk/graph/src/main/java/org/apache/hama/graph/DiskVerticesInfo.java
URL:
http://svn.apache.org/viewvc/hama/trunk/graph/src/main/java/org/apache/hama/graph/DiskVerticesInfo.java?rev=1556453&r1=1556452&r2=1556453&view=diff
==============================================================================
--- hama/trunk/graph/src/main/java/org/apache/hama/graph/DiskVerticesInfo.java
(original)
+++ hama/trunk/graph/src/main/java/org/apache/hama/graph/DiskVerticesInfo.java
Wed Jan 8 07:21:58 2014
@@ -36,6 +36,14 @@ import org.apache.hama.HamaConfiguration
import org.apache.hama.bsp.TaskAttemptID;
import org.apache.hama.graph.IDSkippingIterator.Strategy;
+/**
+ * Stores the sorted vertices into a local file. It doesn't allow modification
+ * and random access by vertexID.
+ *
+ * @param <V>
+ * @param <E>
+ * @param <M>
+ */
@SuppressWarnings("rawtypes")
public final class DiskVerticesInfo<V extends WritableComparable, E extends
Writable, M extends Writable>
implements VerticesInfo<V, E, M> {
@@ -122,7 +130,8 @@ public final class DiskVerticesInfo<V ex
@Override
public void removeVertex(V vertexID) {
- throw new UnsupportedOperationException("Not yet implemented");
+ throw new UnsupportedOperationException(
+ "DiskVerticesInfo doesn't support this operation. Please use the
MapVerticesInfo.");
}
/**
@@ -176,7 +185,8 @@ public final class DiskVerticesInfo<V ex
@Override
public void finishRemovals() {
- throw new UnsupportedOperationException("Not yet implemented");
+ throw new UnsupportedOperationException(
+ "DiskVerticesInfo doesn't support this operation. Please use the
MapVerticesInfo.");
}
private static long[] copy(ArrayList<Long> lst) {
@@ -378,5 +388,4 @@ public final class DiskVerticesInfo<V ex
private static String getSoftGraphFileName(String root, int step) {
return root + "soft_" + step + ".graph";
}
-
}
Modified: hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJob.java
URL:
http://svn.apache.org/viewvc/hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJob.java?rev=1556453&r1=1556452&r2=1556453&view=diff
==============================================================================
--- hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJob.java
(original)
+++ hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJob.java Wed Jan
8 07:21:58 2014
@@ -57,6 +57,7 @@ public class GraphJob extends BSPJob {
public GraphJob(HamaConfiguration conf, Class<?> exampleClass)
throws IOException {
super(conf);
+ this.setBoolean(Constants.PARTITION_SORT_BY_KEY, true);
this.setBspClass(GraphJobRunner.class);
this.setJarByClass(exampleClass);
this.setVertexIDClass(Text.class);
Modified:
hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java
URL:
http://svn.apache.org/viewvc/hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java?rev=1556453&r1=1556452&r2=1556453&view=diff
==============================================================================
--- hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java
(original)
+++ hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java
Wed Jan 8 07:21:58 2014
@@ -255,7 +255,7 @@ public final class GraphJobRunner<V exte
* currentMessage or the first vertex that is active.
*/
IDSkippingIterator<V, E, M> iterator = vertices.skippingIterator();
-
+
// note that can't skip inactive vertices because we have to rewrite the
// complete vertex file in each iteration
while (iterator.hasNext(
@@ -268,11 +268,11 @@ public final class GraphJobRunner<V exte
iterable = iterate(currentMessage, (V) currentMessage.getVertexId(),
vertex, peer);
}
-
+
if (iterable != null && vertex.isHalted()) {
vertex.setActive();
}
-
+
if (!vertex.isHalted()) {
M lastValue = vertex.getValue();
if (iterable == null) {
@@ -289,7 +289,7 @@ public final class GraphJobRunner<V exte
getAggregationRunner().aggregateVertex(lastValue, vertex);
activeVertices++;
}
-
+
// note that we even need to rewrite the vertex if it is halted for
// consistency reasons
vertices.finishVertexComputation(vertex);
@@ -356,7 +356,7 @@ public final class GraphJobRunner<V exte
IDSkippingIterator<V, E, M> skippingIterator = vertices.skippingIterator();
while (skippingIterator.hasNext()) {
Vertex<V, E, M> vertex = skippingIterator.next();
-
+
M lastValue = vertex.getValue();
// Calls setup method.
vertex.setup(conf);
@@ -403,7 +403,7 @@ public final class GraphJobRunner<V exte
getAggregationRunner().setupAggregators(peer);
Class<? extends VerticesInfo<V, E, M>> verticesInfoClass = (Class<?
extends VerticesInfo<V, E, M>>) conf
- .getClass("hama.graph.vertices.info", ListVerticesInfo.class,
+ .getClass("hama.graph.vertices.info", DiskVerticesInfo.class,
VerticesInfo.class);
vertices = ReflectionUtils.newInstance(verticesInfoClass);
vertices.init(this, conf, peer.getTaskId());
@@ -453,7 +453,7 @@ public final class GraphJobRunner<V exte
while ((record = peer.readNext()) != null) {
converted = converter.convertRecord(record, conf);
- currentVertex = (Vertex<V, E, M>) converted.getKey();
+ currentVertex = (Vertex<V, E, M>) converted.getValue();
if (vertex.getVertexID() == null) {
vertex = currentVertex;
Modified:
hama/trunk/graph/src/main/java/org/apache/hama/graph/ListVerticesInfo.java
URL:
http://svn.apache.org/viewvc/hama/trunk/graph/src/main/java/org/apache/hama/graph/ListVerticesInfo.java?rev=1556453&r1=1556452&r2=1556453&view=diff
==============================================================================
--- hama/trunk/graph/src/main/java/org/apache/hama/graph/ListVerticesInfo.java
(original)
+++ hama/trunk/graph/src/main/java/org/apache/hama/graph/ListVerticesInfo.java
Wed Jan 8 07:21:58 2014
@@ -17,14 +17,16 @@
*/
package org.apache.hama.graph;
+import static com.google.common.base.Preconditions.checkArgument;
+
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
+import java.util.ArrayList;
import java.util.Iterator;
-import java.util.Map;
-import java.util.TreeMap;
+import java.util.List;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
@@ -32,7 +34,8 @@ import org.apache.hama.HamaConfiguration
import org.apache.hama.bsp.TaskAttemptID;
/**
- * VerticesInfo encapsulates the storage of vertices in a BSP Task.
+ * Stores the serialized vertices into a memory-based list. It doesn't allow
+ * modification and random access by vertexID.
*
* @param <V> Vertex ID object type
* @param <E> Edge cost object type
@@ -43,7 +46,9 @@ public final class ListVerticesInfo<V ex
private GraphJobRunner<V, E, M> runner;
Vertex<V, E, M> v;
- private final Map<V, byte[]> verticesMap = new TreeMap<V, byte[]>();
+ private final List<byte[]> verticesList = new ArrayList<byte[]>();
+ private boolean lockedAdditions = false;
+ private int index = 0;
private ByteArrayOutputStream bos = null;
private DataOutputStream dos = null;
@@ -58,37 +63,32 @@ public final class ListVerticesInfo<V ex
@Override
public void addVertex(Vertex<V, E, M> vertex) throws IOException {
- if (verticesMap.containsKey(vertex.getVertexID())) {
- throw new UnsupportedOperationException("Vertex with ID: "
- + vertex.getVertexID() + " already exists!");
- } else {
- verticesMap.put(vertex.getVertexID(), serialize(vertex));
- }
+ // messages must be added in sorted order to work this out correctly
+ checkArgument(!lockedAdditions,
+ "Additions are locked now, nobody is allowed to change the structure
anymore.");
+
+ verticesList.add(serialize(vertex));
}
@Override
public void removeVertex(V vertexID) throws UnsupportedOperationException {
- if (verticesMap.containsKey(vertexID)) {
- verticesMap.remove(vertexID);
- } else {
- throw new UnsupportedOperationException("Vertex with ID: " + vertexID
- + " not found on this peer.");
- }
+ throw new UnsupportedOperationException(
+ "ListVerticesInfo doesn't support this operation. Please use the
MapVerticesInfo.");
}
public void clear() {
- verticesMap.clear();
+ verticesList.clear();
}
@Override
public int size() {
- return this.verticesMap.size();
+ return this.verticesList.size();
}
@Override
public IDSkippingIterator<V, E, M> skippingIterator() {
return new IDSkippingIterator<V, E, M>() {
- Iterator<V> it = verticesMap.keySet().iterator();
+ Iterator<byte[]> it = verticesList.iterator();
@Override
public boolean hasNext(V msgId,
@@ -96,13 +96,13 @@ public final class ListVerticesInfo<V ex
throws IOException {
if (it.hasNext()) {
- V vertexID = it.next();
- v = deserialize(vertexID, verticesMap.get(vertexID));
+ byte[] serialized = it.next();
+ v = deserialize(serialized);
while (!strat.accept(v, msgId)) {
if (it.hasNext()) {
- vertexID = it.next();
- v = deserialize(vertexID, verticesMap.get(vertexID));
+ serialized = it.next();
+ v = deserialize(serialized);
} else {
return false;
}
@@ -131,42 +131,38 @@ public final class ListVerticesInfo<V ex
}
public byte[] serialize(Vertex<V, E, M> vertex) throws IOException {
- v = GraphJobRunner.<V, E, M>
newVertexInstance(GraphJobRunner.VERTEX_CLASS);
- v.setEdges(vertex.getEdges());
- v.setValue(vertex.getValue());
- if(vertex.isHalted()) {
- v.voteToHalt();
- }
bos = new ByteArrayOutputStream();
dos = new DataOutputStream(bos);
- v.write(dos);
+ vertex.write(dos);
return bos.toByteArray();
}
- public Vertex<V, E, M> deserialize(V vertexID, byte[] serialized) throws
IOException {
+ public Vertex<V, E, M> deserialize(byte[] serialized) throws IOException {
bis = new ByteArrayInputStream(serialized);
dis = new DataInputStream(bis);
v = GraphJobRunner.<V, E, M>
newVertexInstance(GraphJobRunner.VERTEX_CLASS);
v.readFields(dis);
v.setRunner(runner);
- v.setVertexID(vertexID);
return v;
}
@Override
public void finishVertexComputation(Vertex<V, E, M> vertex)
throws IOException {
- verticesMap.put(vertex.getVertexID(), serialize(vertex));
+ verticesList.set(index, serialize(vertex));
+ index++;
}
@Override
public void finishAdditions() {
-
+ lockedAdditions = true;
}
@Override
public void finishRemovals() {
+ throw new UnsupportedOperationException(
+ "ListVerticesInfo doesn't support this operation. Please use the
MapVerticesInfo.");
}
@Override
@@ -182,6 +178,6 @@ public final class ListVerticesInfo<V ex
@Override
public void startSuperstep() throws IOException {
-
+ index = 0;
}
}
Added: hama/trunk/graph/src/main/java/org/apache/hama/graph/MapVerticesInfo.java
URL:
http://svn.apache.org/viewvc/hama/trunk/graph/src/main/java/org/apache/hama/graph/MapVerticesInfo.java?rev=1556453&view=auto
==============================================================================
--- hama/trunk/graph/src/main/java/org/apache/hama/graph/MapVerticesInfo.java
(added)
+++ hama/trunk/graph/src/main/java/org/apache/hama/graph/MapVerticesInfo.java
Wed Jan 8 07:21:58 2014
@@ -0,0 +1,183 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.graph;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.SortedMap;
+import java.util.TreeMap;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hama.HamaConfiguration;
+import org.apache.hama.bsp.TaskAttemptID;
+
+/**
+ * Stores the vertices into a memory-based tree map. This implementation allows
+ * the runtime graph modification and random access by vertex ID.
+ *
+ * But it might be inefficient in memory usage.
+ *
+ * @param <V> Vertex ID object type
+ * @param <E> Edge cost object type
+ * @param <M> Vertex value object type
+ */
+public final class MapVerticesInfo<V extends WritableComparable<V>, E extends
Writable, M extends Writable>
+ implements VerticesInfo<V, E, M> {
+ private GraphJobRunner<V, E, M> runner;
+ Vertex<V, E, M> v;
+
+ private final SortedMap<V, byte[]> verticesMap = new TreeMap<V, byte[]>();
+
+ private ByteArrayOutputStream bos = null;
+ private DataOutputStream dos = null;
+ private ByteArrayInputStream bis = null;
+ private DataInputStream dis = null;
+
+ @Override
+ public void init(GraphJobRunner<V, E, M> runner, HamaConfiguration conf,
+ TaskAttemptID attempt) throws IOException {
+ this.runner = runner;
+ }
+
+ @Override
+ public void addVertex(Vertex<V, E, M> vertex) throws IOException {
+ if (verticesMap.containsKey(vertex.getVertexID())) {
+ throw new UnsupportedOperationException("Vertex with ID: "
+ + vertex.getVertexID() + " already exists!");
+ } else {
+ verticesMap.put(vertex.getVertexID(), serialize(vertex));
+ }
+ }
+
+ @Override
+ public void removeVertex(V vertexID) throws UnsupportedOperationException {
+ if (verticesMap.containsKey(vertexID)) {
+ verticesMap.remove(vertexID);
+ } else {
+ throw new UnsupportedOperationException("Vertex with ID: " + vertexID
+ + " not found on this peer.");
+ }
+ }
+
+ public void clear() {
+ verticesMap.clear();
+ }
+
+ @Override
+ public int size() {
+ return this.verticesMap.size();
+ }
+
+ @Override
+ public IDSkippingIterator<V, E, M> skippingIterator() {
+ return new IDSkippingIterator<V, E, M>() {
+ Iterator<V> it = verticesMap.keySet().iterator();
+
+ @Override
+ public boolean hasNext(V msgId,
+ org.apache.hama.graph.IDSkippingIterator.Strategy strat)
+ throws IOException {
+
+ if (it.hasNext()) {
+ V vertexID = it.next();
+ v = deserialize(vertexID, verticesMap.get(vertexID));
+
+ while (!strat.accept(v, msgId)) {
+ if (it.hasNext()) {
+ vertexID = it.next();
+ v = deserialize(vertexID, verticesMap.get(vertexID));
+ } else {
+ return false;
+ }
+ }
+
+ return true;
+ } else {
+ v = null;
+ return false;
+ }
+ }
+
+ @Override
+ public Vertex<V, E, M> next() {
+ if (v == null) {
+ throw new UnsupportedOperationException(
+ "You must invoke hasNext before ask for the next vertex.");
+ }
+
+ Vertex<V, E, M> tmp = v;
+ v = null;
+ return tmp;
+ }
+
+ };
+ }
+
+ public byte[] serialize(Vertex<V, E, M> vertex) throws IOException {
+ bos = new ByteArrayOutputStream();
+ dos = new DataOutputStream(bos);
+ vertex.write(dos);
+ return bos.toByteArray();
+ }
+
+ public Vertex<V, E, M> deserialize(V vertexID, byte[] serialized)
+ throws IOException {
+ bis = new ByteArrayInputStream(serialized);
+ dis = new DataInputStream(bis);
+ v = GraphJobRunner.<V, E, M>
newVertexInstance(GraphJobRunner.VERTEX_CLASS);
+
+ v.readFields(dis);
+ v.setRunner(runner);
+ v.setVertexID(vertexID);
+ return v;
+ }
+
+ @Override
+ public void finishVertexComputation(Vertex<V, E, M> vertex)
+ throws IOException {
+ verticesMap.put(vertex.getVertexID(), serialize(vertex));
+ }
+
+ @Override
+ public void finishAdditions() {
+ }
+
+ @Override
+ public void finishRemovals() {
+ }
+
+ @Override
+ public void finishSuperstep() {
+ }
+
+ @Override
+ public void cleanup(HamaConfiguration conf, TaskAttemptID attempt)
+ throws IOException {
+
+ }
+
+ @Override
+ public void startSuperstep() throws IOException {
+
+ }
+}
Modified: hama/trunk/graph/src/main/java/org/apache/hama/graph/Vertex.java
URL:
http://svn.apache.org/viewvc/hama/trunk/graph/src/main/java/org/apache/hama/graph/Vertex.java?rev=1556453&r1=1556452&r2=1556453&view=diff
==============================================================================
--- hama/trunk/graph/src/main/java/org/apache/hama/graph/Vertex.java (original)
+++ hama/trunk/graph/src/main/java/org/apache/hama/graph/Vertex.java Wed Jan 8
07:21:58 2014
@@ -17,8 +17,12 @@
*/
package org.apache.hama.graph;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
import java.io.DataInput;
+import java.io.DataInputStream;
import java.io.DataOutput;
+import java.io.DataOutputStream;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
@@ -65,7 +69,7 @@ public abstract class Vertex<V extends W
@Override
public V getVertexID() {
- return vertexID;
+ return this.vertexID;
}
@Override
@@ -299,10 +303,10 @@ public abstract class Vertex<V extends W
@Override
public void readFields(DataInput in) throws IOException {
if (in.readBoolean()) {
- if (vertexID == null) {
- vertexID = GraphJobRunner.createVertexIDObject();
+ if (this.vertexID == null) {
+ this.vertexID = GraphJobRunner.createVertexIDObject();
}
- vertexID.readFields(in);
+ this.vertexID.readFields(in);
}
if (in.readBoolean()) {
if (this.value == null) {
@@ -402,4 +406,17 @@ public abstract class Vertex<V extends W
protected GraphJobRunner<V, E, M> getRunner() {
return runner;
}
+
+ public Vertex<V, E, M> deepCopy() throws IOException {
+ ByteArrayOutputStream bos = new ByteArrayOutputStream();
+ DataOutputStream dos = new DataOutputStream(bos);
+ this.write(dos);
+
+ ByteArrayInputStream bis = new ByteArrayInputStream(bos.toByteArray());
+ DataInputStream dis = new DataInputStream(bis);
+
+ Vertex<V, E, M> vertex = GraphJobRunner.<V, E, M>
newVertexInstance(GraphJobRunner.VERTEX_CLASS);
+ vertex.readFields(dis);
+ return vertex;
+ }
}
Modified:
hama/trunk/graph/src/main/java/org/apache/hama/graph/VertexInputReader.java
URL:
http://svn.apache.org/viewvc/hama/trunk/graph/src/main/java/org/apache/hama/graph/VertexInputReader.java?rev=1556453&r1=1556452&r2=1556453&view=diff
==============================================================================
--- hama/trunk/graph/src/main/java/org/apache/hama/graph/VertexInputReader.java
(original)
+++ hama/trunk/graph/src/main/java/org/apache/hama/graph/VertexInputReader.java
Wed Jan 8 07:21:58 2014
@@ -17,15 +17,9 @@
*/
package org.apache.hama.graph;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.TreeMap;
-
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hama.bsp.BSPPeer;
@@ -85,8 +79,9 @@ public abstract class VertexInputReader<
if (!vertexCreation) {
return null;
}
- outputRecord.setKey(vertex);
- outputRecord.setValue(NullWritable.get());
+
+ outputRecord.setKey(vertex.getVertexID());
+ outputRecord.setValue(vertex);
return outputRecord;
}
@@ -94,20 +89,10 @@ public abstract class VertexInputReader<
@Override
public int getPartitionId(KeyValuePair<Writable, Writable> inputRecord,
Partitioner partitioner, Configuration conf, BSPPeer peer, int numTasks)
{
- Vertex<V, E, M> vertex = (Vertex<V, E, M>) outputRecord.getKey();
+ Vertex<V, E, M> vertex = (Vertex<V, E, M>) outputRecord.getValue();
+
return Math.abs(partitioner.getPartition(vertex.getVertexID(),
vertex.getValue(), numTasks));
}
- // final because we don't want vertices to change ordering
- @Override
- public final Map<Writable, Writable> newMap() {
- return new TreeMap<Writable, Writable>();
- }
-
- @Override
- public List<KeyValuePair<Writable, Writable>> newList() {
- return new LinkedList<KeyValuePair<Writable,Writable>>();
- }
-
}
Modified:
hama/trunk/ml/src/main/java/org/apache/hama/ml/semiclustering/SemiClusteringVertex.java
URL:
http://svn.apache.org/viewvc/hama/trunk/ml/src/main/java/org/apache/hama/ml/semiclustering/SemiClusteringVertex.java?rev=1556453&r1=1556452&r2=1556453&view=diff
==============================================================================
---
hama/trunk/ml/src/main/java/org/apache/hama/ml/semiclustering/SemiClusteringVertex.java
(original)
+++
hama/trunk/ml/src/main/java/org/apache/hama/ml/semiclustering/SemiClusteringVertex.java
Wed Jan 8 07:21:58 2014
@@ -18,15 +18,21 @@
package org.apache.hama.ml.semiclustering;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Set;
+import java.util.TreeSet;
+
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.Text;
import org.apache.hama.HamaConfiguration;
import org.apache.hama.graph.Edge;
import org.apache.hama.graph.Vertex;
-import java.io.IOException;
-import java.util.*;
-
/**
* SemiClusteringVertex Class defines each vertex in a Graph job and the
* compute() method is the function which is applied on each Vertex in the
graph
@@ -57,6 +63,7 @@ public class SemiClusteringVertex extend
if (this.getSuperstepCount() == 0) {
firstSuperStep();
}
+
if (this.getSuperstepCount() >= 1) {
Set<SemiClusterMessage> scListContainThis = new
TreeSet<SemiClusterMessage>();
Set<SemiClusterMessage> scListNotContainThis = new
TreeSet<SemiClusterMessage>();
@@ -104,13 +111,14 @@ public class SemiClusteringVertex extend
* @throws java.io.IOException
*/
public void firstSuperStep() throws IOException {
- Vertex<Text, DoubleWritable, SemiClusterMessage> v = this;
+ Vertex<Text, DoubleWritable, SemiClusterMessage> v = this.deepCopy();
List<Vertex<Text, DoubleWritable, SemiClusterMessage>> lV = new
ArrayList<Vertex<Text, DoubleWritable, SemiClusterMessage>>();
lV.add(v);
String newClusterName = "C" + createNewSemiClusterName(lV);
SemiClusterMessage initialClusters = new SemiClusterMessage(newClusterName,
lV, 1);
sendMessageToNeighbors(initialClusters);
+
Set<SemiClusterDetails> scList = new TreeSet<SemiClusterDetails>();
scList.add(new SemiClusterDetails(newClusterName, 1.0));
SemiClusterMessage vertexValue = new SemiClusterMessage(scList);
@@ -192,6 +200,7 @@ public class SemiClusteringVertex extend
while (vertexItrator.hasNext()) {
vertexId.add(vertexItrator.next().getVertexID().toString());
}
+
return vertexId;
}
@@ -202,7 +211,7 @@ public class SemiClusteringVertex extend
public boolean isVertexInSc(SemiClusterMessage msg) {
List<String> vertexId = getSemiClusterVerticesIdList(msg.getVertexList());
return vertexId.contains(this.getVertexID().toString())
- && vertexId.size() < semiClusterMaximumVertexCount;
+ && vertexId.size() < semiClusterMaximumVertexCount;
}
/**