Author: tommaso
Date: Sat Sep 21 06:40:13 2013
New Revision: 1525197
URL: http://svn.apache.org/r1525197
Log:
HAMA-732 - applied patch for integration with DM for OffHeapVerticesInfo
Modified:
hama/trunk/graph/pom.xml
hama/trunk/graph/src/main/java/org/apache/hama/graph/DefaultVertexOutputWriter.java
hama/trunk/graph/src/main/java/org/apache/hama/graph/DiskVerticesInfo.java
hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java
hama/trunk/graph/src/main/java/org/apache/hama/graph/ListVerticesInfo.java
hama/trunk/graph/src/main/java/org/apache/hama/graph/Vertex.java
hama/trunk/graph/src/main/java/org/apache/hama/graph/VerticesInfo.java
hama/trunk/graph/src/test/java/org/apache/hama/graph/TestSubmitGraphJob.java
Modified: hama/trunk/graph/pom.xml
URL:
http://svn.apache.org/viewvc/hama/trunk/graph/pom.xml?rev=1525197&r1=1525196&r2=1525197&view=diff
==============================================================================
--- hama/trunk/graph/pom.xml (original)
+++ hama/trunk/graph/pom.xml Sat Sep 21 06:40:13 2013
@@ -37,6 +37,16 @@
<version>${project.version}</version>
</dependency>
<dependency>
+ <groupId>org.apache.directmemory</groupId>
+ <artifactId>directmemory-cache</artifactId>
+ <version>0.2</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.directmemory</groupId>
+ <artifactId>directmemory-kryo</artifactId>
+ <version>0.2</version>
+ </dependency>
+ <dependency>
<groupId>org.apache.hama</groupId>
<artifactId>hama-core</artifactId>
<version>${project.version}</version>
Modified:
hama/trunk/graph/src/main/java/org/apache/hama/graph/DefaultVertexOutputWriter.java
URL:
http://svn.apache.org/viewvc/hama/trunk/graph/src/main/java/org/apache/hama/graph/DefaultVertexOutputWriter.java?rev=1525197&r1=1525196&r2=1525197&view=diff
==============================================================================
---
hama/trunk/graph/src/main/java/org/apache/hama/graph/DefaultVertexOutputWriter.java
(original)
+++
hama/trunk/graph/src/main/java/org/apache/hama/graph/DefaultVertexOutputWriter.java
Sat Sep 21 06:40:13 2013
@@ -45,6 +45,8 @@ public class DefaultVertexOutputWriter<V
public void write(Vertex<V, E, M> vertex,
BSPPeer<Writable, Writable, V, M, GraphJobMessage> peer)
throws IOException {
+ assert vertex.getVertexID() != null : "vertex id cannot be null";
+ assert vertex.getValue() != null : "vertex value cannot be null";
peer.write(vertex.getVertexID(), vertex.getValue());
}
Modified:
hama/trunk/graph/src/main/java/org/apache/hama/graph/DiskVerticesInfo.java
URL:
http://svn.apache.org/viewvc/hama/trunk/graph/src/main/java/org/apache/hama/graph/DiskVerticesInfo.java?rev=1525197&r1=1525196&r2=1525197&view=diff
==============================================================================
--- hama/trunk/graph/src/main/java/org/apache/hama/graph/DiskVerticesInfo.java
(original)
+++ hama/trunk/graph/src/main/java/org/apache/hama/graph/DiskVerticesInfo.java
Sat Sep 21 06:40:13 2013
@@ -17,8 +17,6 @@
*/
package org.apache.hama.graph;
-import static com.google.common.base.Preconditions.checkArgument;
-
import java.io.IOException;
import java.util.ArrayList;
import java.util.BitSet;
@@ -36,6 +34,8 @@ import org.apache.hadoop.io.WritableComp
import org.apache.hama.bsp.TaskAttemptID;
import org.apache.hama.graph.IDSkippingIterator.Strategy;
+import static com.google.common.base.Preconditions.checkArgument;
+
@SuppressWarnings("rawtypes")
public final class DiskVerticesInfo<V extends WritableComparable, E extends
Writable, M extends Writable>
implements VerticesInfo<V, E, M> {
@@ -188,11 +188,6 @@ public final class DiskVerticesInfo<V ex
}
@Override
- public boolean isFinishedAdditions() {
- return lockedAdditions;
- }
-
- @Override
public void startSuperstep() throws IOException {
index = 0;
String softGraphFileName = getSoftGraphFileName(rootPath, currentStep);
@@ -271,7 +266,7 @@ public final class DiskVerticesInfo<V ex
if (cachedVertexInstance == null) {
cachedVertexInstance = GraphJobRunner
.<V, E, M> newVertexInstance(GraphJobRunner.VERTEX_CLASS);
- cachedVertexInstance.runner = runner;
+ cachedVertexInstance.setRunner(runner);
}
ensureVertexIDNotNull();
} catch (IOException e) {
Modified:
hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java
URL:
http://svn.apache.org/viewvc/hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java?rev=1525197&r1=1525196&r2=1525197&view=diff
==============================================================================
--- hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java
(original)
+++ hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java
Sat Sep 21 06:40:13 2013
@@ -425,7 +425,7 @@ public final class GraphJobRunner<V exte
while ((record = peer.readNext()) != null) {
converted = converter.convertRecord(record, conf);
vertex = (Vertex<V, E, M>) converted.getKey();
- vertex.runner = this;
+ vertex.setRunner(this);
vertex.setup(conf);
if (selfReference) {
@@ -437,7 +437,7 @@ public final class GraphJobRunner<V exte
// Reinitializing vertex object for memory based implementations of
// VerticesInfo
vertex = GraphJobRunner.<V, E, M> newVertexInstance(VERTEX_CLASS);
- vertex.runner = this;
+ vertex.setRunner(this);
}
vertices.finishAdditions();
// finish the "superstep" because we have written a new file here
@@ -453,7 +453,7 @@ public final class GraphJobRunner<V exte
* @throws IOException
*/
private void addVertex(Vertex<V, E, M> vertex) throws IOException {
- vertex.runner = this;
+ vertex.setRunner(this);
vertex.setup(conf);
if (conf.getBoolean("hama.graph.self.ref", false)) {
Modified:
hama/trunk/graph/src/main/java/org/apache/hama/graph/ListVerticesInfo.java
URL:
http://svn.apache.org/viewvc/hama/trunk/graph/src/main/java/org/apache/hama/graph/ListVerticesInfo.java?rev=1525197&r1=1525196&r2=1525197&view=diff
==============================================================================
--- hama/trunk/graph/src/main/java/org/apache/hama/graph/ListVerticesInfo.java
(original)
+++ hama/trunk/graph/src/main/java/org/apache/hama/graph/ListVerticesInfo.java
Sat Sep 21 06:40:13 2013
@@ -122,12 +122,7 @@ public final class ListVerticesInfo<V ex
public void finishRemovals() {
}
- @Override
- public boolean isFinishedAdditions() {
- return false;
- }
-
- @Override
+ @Override
public void finishSuperstep() {
}
Modified: hama/trunk/graph/src/main/java/org/apache/hama/graph/Vertex.java
URL:
http://svn.apache.org/viewvc/hama/trunk/graph/src/main/java/org/apache/hama/graph/Vertex.java?rev=1525197&r1=1525196&r2=1525197&view=diff
==============================================================================
--- hama/trunk/graph/src/main/java/org/apache/hama/graph/Vertex.java (original)
+++ hama/trunk/graph/src/main/java/org/apache/hama/graph/Vertex.java Sat Sep 21
06:40:13 2013
@@ -25,7 +25,6 @@ import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.IntWritable;
-import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.MapWritable;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
@@ -52,7 +51,7 @@ import org.apache.hama.bsp.Partitioner;
public abstract class Vertex<V extends WritableComparable, E extends Writable,
M extends Writable>
implements VertexInterface<V, E, M> {
- GraphJobRunner<?, ?, ?> runner;
+ private transient GraphJobRunner<V, E, M> runner;
private V vertexID;
private M value;
@@ -228,7 +227,6 @@ public abstract class Vertex<V extends W
/**
* @return the configured partitioner instance to message vertices.
*/
- @SuppressWarnings("unchecked")
public Partitioner<V, M> getPartitioner() {
return (Partitioner<V, M>) runner.getPartitioner();
}
@@ -380,4 +378,11 @@ public abstract class Vertex<V extends W
}
+ protected void setRunner(GraphJobRunner<V, E, M> runner) {
+ this.runner = runner;
+ }
+
+ protected GraphJobRunner<V, E, M> getRunner() {
+ return runner;
+ }
}
Modified: hama/trunk/graph/src/main/java/org/apache/hama/graph/VerticesInfo.java
URL:
http://svn.apache.org/viewvc/hama/trunk/graph/src/main/java/org/apache/hama/graph/VerticesInfo.java?rev=1525197&r1=1525196&r2=1525197&view=diff
==============================================================================
--- hama/trunk/graph/src/main/java/org/apache/hama/graph/VerticesInfo.java
(original)
+++ hama/trunk/graph/src/main/java/org/apache/hama/graph/VerticesInfo.java Sat
Sep 21 06:40:13 2013
@@ -86,11 +86,6 @@ public interface VerticesInfo<V extends
throws IOException;
/**
- * @return true of all vertices are added.
- */
- public boolean isFinishedAdditions();
-
- /**
* @return the number of vertices added to the underlying structure.
* Implementations should take care this is a constant time
operation.
*/
Modified:
hama/trunk/graph/src/test/java/org/apache/hama/graph/TestSubmitGraphJob.java
URL:
http://svn.apache.org/viewvc/hama/trunk/graph/src/test/java/org/apache/hama/graph/TestSubmitGraphJob.java?rev=1525197&r1=1525196&r2=1525197&view=diff
==============================================================================
---
hama/trunk/graph/src/test/java/org/apache/hama/graph/TestSubmitGraphJob.java
(original)
+++
hama/trunk/graph/src/test/java/org/apache/hama/graph/TestSubmitGraphJob.java
Sat Sep 21 06:40:13 2013
@@ -18,6 +18,9 @@
package org.apache.hama.graph;
import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
@@ -36,6 +39,7 @@ import org.apache.hama.bsp.TestBSPMaster
import org.apache.hama.bsp.TextArrayWritable;
import org.apache.hama.graph.example.PageRank;
import org.apache.hama.graph.example.PageRank.PagerankSeqReader;
+import org.junit.Before;
public class TestSubmitGraphJob extends TestBSPMasterGroomServer {
@@ -49,6 +53,16 @@ public class TestSubmitGraphJob extends
private static String INPUT = "/tmp/pagerank/real-tmp.seq";
private static String OUTPUT = "/tmp/pagerank/real-out";
+ private static final List<Class<? extends VerticesInfo>> vi = new
ArrayList<Class<? extends VerticesInfo>>();
+
+ @Before
+ public void setUp() throws Exception {
+ super.setUp();
+ vi.add(ListVerticesInfo.class);
+ vi.add(DiskVerticesInfo.class);
+ vi.add(OffHeapVerticesInfo.class);
+ }
+
@Override
public void testSubmitJob() throws Exception {
@@ -60,6 +74,7 @@ public class TestSubmitGraphJob extends
BSPJobClient jobClient = new BSPJobClient(configuration);
configuration.setInt(Constants.ZOOKEEPER_SESSION_TIMEOUT, 6000);
configuration.set("hama.graph.self.ref", "true");
+ injectVerticesInfo();
ClusterStatus cluster = jobClient.getClusterStatus(false);
assertEquals(this.numOfGroom, cluster.getGroomServers());
LOG.info("Client finishes execution job.");
@@ -98,6 +113,11 @@ public class TestSubmitGraphJob extends
}
}
+ protected void injectVerticesInfo() {
+ Class<? extends VerticesInfo> verticesInfoClass = vi.get(Math.abs(new
Random().nextInt() % 3));
+ LOG.info("using vertices info of type : "+verticesInfoClass.getName());
+ }
+
private void verifyResult() throws IOException {
double sum = 0.0;
FileStatus[] globStatus = fs.globStatus(new Path(OUTPUT + "/part-*"));