Author: edwardyoon
Date: Thu Apr 30 00:11:23 2015
New Revision: 1676879
URL: http://svn.apache.org/r1676879
Log:
HAMA-946: stores vertices in serialized form
Added:
hama/trunk/core/src/main/java/org/apache/hama/util/WritableUtils.java
Removed:
hama/trunk/core/src/main/java/org/apache/hama/util/KryoSerializer.java
hama/trunk/core/src/test/java/org/apache/hama/util/TestKryoSerializer.java
hama/trunk/examples/src/main/java/org/apache/hama/examples/MaxFlow.java
hama/trunk/examples/src/test/java/org/apache/hama/examples/MaxFlowTest.java
Modified:
hama/trunk/core/pom.xml
hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java
hama/trunk/graph/src/main/java/org/apache/hama/graph/MapVerticesInfo.java
hama/trunk/graph/src/main/java/org/apache/hama/graph/Vertex.java
hama/trunk/graph/src/main/java/org/apache/hama/graph/VerticesInfo.java
hama/trunk/graph/src/test/java/org/apache/hama/graph/TestGraphJobMessage.java
hama/trunk/pom.xml
Modified: hama/trunk/core/pom.xml
URL:
http://svn.apache.org/viewvc/hama/trunk/core/pom.xml?rev=1676879&r1=1676878&r2=1676879&view=diff
==============================================================================
--- hama/trunk/core/pom.xml (original)
+++ hama/trunk/core/pom.xml Thu Apr 30 00:11:23 2015
@@ -149,16 +149,6 @@
<artifactId>zookeeper</artifactId>
</dependency>
<dependency>
- <groupId>org.apache.directmemory</groupId>
- <artifactId>directmemory-cache</artifactId>
- <version>0.2</version>
- </dependency>
- <dependency>
- <groupId>org.apache.directmemory</groupId>
- <artifactId>directmemory-kryo</artifactId>
- <version>0.2</version>
- </dependency>
- <dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.0.21.Final</version>
Added: hama/trunk/core/src/main/java/org/apache/hama/util/WritableUtils.java
URL:
http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/util/WritableUtils.java?rev=1676879&view=auto
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/util/WritableUtils.java
(added)
+++ hama/trunk/core/src/main/java/org/apache/hama/util/WritableUtils.java Thu
Apr 30 00:11:23 2015
@@ -0,0 +1,50 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.util;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutput;
+import java.io.DataOutputStream;
+import java.io.IOException;
+
+import org.apache.hadoop.io.Writable;
+
+public class WritableUtils {
+
+ public static byte[] serialize(Writable w) {
+ ByteArrayOutputStream out = new ByteArrayOutputStream();
+ DataOutput output = new DataOutputStream(out);
+ try {
+ w.write(output);
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ return out.toByteArray();
+ }
+
+ public static void deserialize(byte[] bytes, Writable obj) {
+ DataInputStream in = new DataInputStream(new ByteArrayInputStream(bytes));
+ try {
+ obj.readFields(in);
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ }
+}
Modified:
hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java
URL:
http://svn.apache.org/viewvc/hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java?rev=1676879&r1=1676878&r2=1676879&view=diff
==============================================================================
--- hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java
(original)
+++ hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java
Thu Apr 30 00:11:23 2015
@@ -18,9 +18,7 @@
package org.apache.hama.graph;
import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
import java.io.DataInputStream;
-import java.io.DataOutputStream;
import java.io.IOException;
import java.util.Collections;
import java.util.Iterator;
@@ -50,6 +48,7 @@ import org.apache.hama.bsp.Partitioner;
import org.apache.hama.bsp.sync.SyncException;
import org.apache.hama.commons.util.KeyValuePair;
import org.apache.hama.util.ReflectionUtils;
+import org.apache.hama.util.WritableUtils;
/**
* Fully generic graph job runner.
@@ -239,6 +238,7 @@ public final class GraphJobRunner<V exte
* iterate over our messages and vertices in sorted order. That means that we
* need to seek the first vertex that has the same ID as the iterated
message.
*/
+ @SuppressWarnings("unchecked")
private void doSuperstep(GraphJobMessage currentMessage,
BSPPeer<Writable, Writable, Writable, Writable, GraphJobMessage> peer)
throws IOException {
@@ -269,7 +269,9 @@ public final class GraphJobRunner<V exte
LOG.error(e);
}
- for (Vertex<V, E, M> vertex : vertices.getValues()) {
+ Iterator it = vertices.iterator();
+ while (it.hasNext()) {
+ Vertex<V, E, M> vertex = (Vertex<V, E, M>) it.next();
if (!vertex.isHalted() && !vertex.isComputed()) {
vertex.compute(Collections.<M> emptyList());
vertices.finishVertexComputation(vertex);
@@ -305,7 +307,7 @@ public final class GraphJobRunner<V exte
executor.setMaximumPoolSize(conf.getInt(DEFAULT_THREAD_POOL_SIZE, 256));
executor.setRejectedExecutionHandler(retryHandler);
- for (Vertex<V, E, M> v : vertices.getValues()) {
+ for (V v : vertices.keySet()) {
Runnable worker = new ComputeRunnable(v);
executor.execute(worker);
}
@@ -328,13 +330,23 @@ public final class GraphJobRunner<V exte
@SuppressWarnings("unchecked")
public ComputeRunnable(GraphJobMessage msg) {
- this.vertex = vertices.get((V) msg.getVertexId());
+ try {
+ this.vertex = vertices.get((V) msg.getVertexId());
+ } catch (IOException e) {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ }
this.msgs = (Iterable<M>) getIterableMessages(msg.getValuesBytes(),
msg.getNumOfValues());
}
- public ComputeRunnable(Vertex<V, E, M> v) {
- this.vertex = v;
+ public ComputeRunnable(V v) {
+ try {
+ this.vertex = vertices.get(v);
+ } catch (IOException e) {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ }
}
@Override
@@ -434,24 +446,26 @@ public final class GraphJobRunner<V exte
executor.setMaximumPoolSize(conf.getInt(DEFAULT_THREAD_POOL_SIZE, 256));
executor.setRejectedExecutionHandler(retryHandler);
- try {
- KeyValuePair<Writable, Writable> next = null;
- while ((next = peer.readNext()) != null) {
- Vertex<V, E, M> vertex = GraphJobRunner
- .<V, E, M> newVertexInstance(VERTEX_CLASS);
-
- boolean vertexFinished = reader.parseVertex(next.getKey(),
- next.getValue(), vertex);
+ KeyValuePair<Writable, Writable> next = null;
+ while ((next = peer.readNext()) != null) {
+ Vertex<V, E, M> vertex = GraphJobRunner
+ .<V, E, M> newVertexInstance(VERTEX_CLASS);
- if (!vertexFinished) {
- continue;
- }
- Runnable worker = new Parser(vertex);
- executor.execute(worker);
+ boolean vertexFinished = false;
+ try {
+ vertexFinished = reader.parseVertex(next.getKey(), next.getValue(),
+ vertex);
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ if (!vertexFinished) {
+ continue;
}
- } catch (Exception e) {
- e.printStackTrace();
+
+ Runnable worker = new Parser(vertex);
+ executor.execute(worker);
+
}
executor.shutdown();
@@ -526,7 +540,7 @@ public final class GraphJobRunner<V exte
if (peer.getPeerIndex() == partition) {
addVertex(vertex);
} else {
- messages.get(partition).add(serialize(vertex));
+ messages.get(partition).add(WritableUtils.serialize(vertex));
}
} catch (Exception e) {
e.printStackTrace();
@@ -544,7 +558,6 @@ public final class GraphJobRunner<V exte
vertex.addEdge(new Edge<V, E>(vertex.getVertexID(), null));
}
- vertex.setRunner(this);
vertices.put(vertex);
}
@@ -690,8 +703,8 @@ public final class GraphJobRunner<V exte
if (combiner != null && e.getValue().getNumOfValues() > 1) {
GraphJobMessage combined = new GraphJobMessage(e.getKey(),
- serialize(combiner.combine(getIterableMessages(e.getValue()
- .getValuesBytes(), e.getValue().getNumOfValues()))));
+ WritableUtils.serialize(combiner.combine(getIterableMessages(e
+ .getValue().getValuesBytes(),
e.getValue().getNumOfValues()))));
combined.setFlag(GraphJobMessage.VERTEX_FLAG);
peer.send(getHostName(e.getKey()), combined);
} else {
@@ -707,14 +720,6 @@ public final class GraphJobRunner<V exte
}
}
- public static byte[] serialize(Writable writable) throws IOException {
- ByteArrayOutputStream a = new ByteArrayOutputStream();
- DataOutputStream b = new DataOutputStream(a);
- writable.write(b);
- a.close();
- return a.toByteArray();
- }
-
public Iterable<Writable> getIterableMessages(final byte[] valuesBytes,
final int numOfValues) {
Modified:
hama/trunk/graph/src/main/java/org/apache/hama/graph/MapVerticesInfo.java
URL:
http://svn.apache.org/viewvc/hama/trunk/graph/src/main/java/org/apache/hama/graph/MapVerticesInfo.java?rev=1676879&r1=1676878&r2=1676879&view=diff
==============================================================================
--- hama/trunk/graph/src/main/java/org/apache/hama/graph/MapVerticesInfo.java
(original)
+++ hama/trunk/graph/src/main/java/org/apache/hama/graph/MapVerticesInfo.java
Thu Apr 30 00:11:23 2015
@@ -18,7 +18,6 @@
package org.apache.hama.graph;
import java.io.IOException;
-import java.util.Collection;
import java.util.Iterator;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
@@ -27,6 +26,7 @@ import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hama.HamaConfiguration;
import org.apache.hama.bsp.TaskAttemptID;
+import org.apache.hama.util.WritableUtils;
/**
* Stores the vertices into a memory-based tree map. This implementation allows
@@ -40,23 +40,29 @@ import org.apache.hama.bsp.TaskAttemptID
*/
public final class MapVerticesInfo<V extends WritableComparable<V>, E extends
Writable, M extends Writable>
implements VerticesInfo<V, E, M> {
- private final ConcurrentHashMap<V, Vertex<V, E, M>> vertices = new
ConcurrentHashMap<V, Vertex<V, E, M>>();
+ private final ConcurrentHashMap<V, byte[]> vertices = new
ConcurrentHashMap<V, byte[]>();
+
+ private GraphJobRunner<V, E, M> runner;
private int activeVertices = 0;
@Override
public void init(GraphJobRunner<V, E, M> runner, HamaConfiguration conf,
TaskAttemptID attempt) throws IOException {
+ this.runner = runner;
}
@Override
public void put(Vertex<V, E, M> vertex) throws IOException {
if (!vertices.containsKey(vertex.getVertexID())) {
- vertices.putIfAbsent(vertex.getVertexID(), vertex);
+ vertices.putIfAbsent(vertex.getVertexID(),
+ WritableUtils.serialize(vertex));
} else {
+ Vertex<V, E, M> v = this.get(vertex.getVertexID());
for (Edge<V, E> e : vertex.getEdges()) {
- vertices.get(vertex.getVertexID()).addEdge(e);
+ v.addEdge(e);
}
+ vertices.put(vertex.getVertexID(), WritableUtils.serialize(v));
}
}
@@ -70,41 +76,44 @@ public final class MapVerticesInfo<V ext
}
@Override
- public Collection<Vertex<V, E, M>> getValues() {
- return vertices.values();
- }
-
- @Override
public int size() {
return vertices.size();
}
@Override
- public Vertex<V, E, M> get(V vertexID) {
- return vertices.get(vertexID);
+ public Vertex<V, E, M> get(V vertexID) throws IOException {
+ Vertex<V, E, M> v = GraphJobRunner
+ .<V, E, M> newVertexInstance(GraphJobRunner.VERTEX_CLASS);
+ WritableUtils.deserialize(vertices.get(vertexID), v);
+ v.setRunner(runner);
+
+ return v;
}
@Override
public Iterator<Vertex<V, E, M>> iterator() {
- final Iterator<Vertex<V, E, M>> vertexIterator = vertices.values()
- .iterator();
+ final Iterator<byte[]> it = vertices.values().iterator();
return new Iterator<Vertex<V, E, M>>() {
@Override
public boolean hasNext() {
- return vertexIterator.hasNext();
+ return it.hasNext();
}
@Override
public Vertex<V, E, M> next() {
- return vertexIterator.next();
+ Vertex<V, E, M> v = GraphJobRunner
+ .<V, E, M> newVertexInstance(GraphJobRunner.VERTEX_CLASS);
+ WritableUtils.deserialize(it.next(), v);
+ v.setRunner(runner);
+ return v;
}
@Override
public void remove() {
- // TODO Auto-generated method stub
+ it.remove();
}
};
@@ -120,6 +129,7 @@ public final class MapVerticesInfo<V ext
throws IOException {
incrementCount();
vertex.setComputed();
+ vertices.put(vertex.getVertexID(), WritableUtils.serialize(vertex));
}
public synchronized void incrementCount() {
Modified: hama/trunk/graph/src/main/java/org/apache/hama/graph/Vertex.java
URL:
http://svn.apache.org/viewvc/hama/trunk/graph/src/main/java/org/apache/hama/graph/Vertex.java?rev=1676879&r1=1676878&r2=1676879&view=diff
==============================================================================
--- hama/trunk/graph/src/main/java/org/apache/hama/graph/Vertex.java (original)
+++ hama/trunk/graph/src/main/java/org/apache/hama/graph/Vertex.java Thu Apr 30
00:11:23 2015
@@ -30,6 +30,7 @@ import org.apache.hadoop.io.WritableComp
import org.apache.hama.HamaConfiguration;
import org.apache.hama.bsp.BSPPeer;
import org.apache.hama.bsp.Counters.Counter;
+import org.apache.hama.util.WritableUtils;
/**
* Vertex is a abstract definition of Google Pregel Vertex. For implementing a
@@ -65,6 +66,9 @@ public abstract class Vertex<V extends W
return runner.getPeer().getConfiguration();
}
+ public Vertex() {
+ }
+
@Override
public V getVertexID() {
return this.vertexID;
@@ -77,18 +81,18 @@ public abstract class Vertex<V extends W
@Override
public void sendMessage(Edge<V, E> e, M msg) throws IOException {
runner.sendMessage(e.getDestinationVertexID(),
- GraphJobRunner.serialize(msg));
+ WritableUtils.serialize(msg));
}
@Override
public void sendMessage(V destinationVertexID, M msg) throws IOException {
- runner.sendMessage(destinationVertexID, GraphJobRunner.serialize(msg));
+ runner.sendMessage(destinationVertexID, WritableUtils.serialize(msg));
}
@Override
public void sendMessageToNeighbors(M msg) throws IOException {
final List<Edge<V, E>> outEdges = this.getEdges();
- byte[] serialized = GraphJobRunner.serialize(msg);
+ byte[] serialized = WritableUtils.serialize(msg);
for (Edge<V, E> e : outEdges) {
runner.sendMessage(e.getDestinationVertexID(), serialized);
}
Modified: hama/trunk/graph/src/main/java/org/apache/hama/graph/VerticesInfo.java
URL:
http://svn.apache.org/viewvc/hama/trunk/graph/src/main/java/org/apache/hama/graph/VerticesInfo.java?rev=1676879&r1=1676878&r2=1676879&view=diff
==============================================================================
--- hama/trunk/graph/src/main/java/org/apache/hama/graph/VerticesInfo.java
(original)
+++ hama/trunk/graph/src/main/java/org/apache/hama/graph/VerticesInfo.java Thu
Apr 30 00:11:23 2015
@@ -18,7 +18,6 @@
package org.apache.hama.graph;
import java.io.IOException;
-import java.util.Collection;
import java.util.Iterator;
import java.util.Set;
@@ -48,7 +47,7 @@ public interface VerticesInfo<V extends
*/
public void put(Vertex<V, E, M> vertex) throws IOException;
- public Vertex<V, E, M> get(V vertexID);
+ public Vertex<V, E, M> get(V vertexID) throws IOException;
/**
* Remove a vertex to the underlying structure.
@@ -80,8 +79,6 @@ public interface VerticesInfo<V extends
*/
public Set<V> keySet();
- public Collection<Vertex<V, E, M>> getValues();
-
public int getActiveVerticesNum();
/**
Modified:
hama/trunk/graph/src/test/java/org/apache/hama/graph/TestGraphJobMessage.java
URL:
http://svn.apache.org/viewvc/hama/trunk/graph/src/test/java/org/apache/hama/graph/TestGraphJobMessage.java?rev=1676879&r1=1676878&r2=1676879&view=diff
==============================================================================
---
hama/trunk/graph/src/test/java/org/apache/hama/graph/TestGraphJobMessage.java
(original)
+++
hama/trunk/graph/src/test/java/org/apache/hama/graph/TestGraphJobMessage.java
Thu Apr 30 00:11:23 2015
@@ -26,6 +26,7 @@ import junit.framework.TestCase;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.MapWritable;
import org.apache.hadoop.io.Text;
+import org.apache.hama.util.WritableUtils;
import org.junit.Test;
import com.google.common.collect.Lists;
@@ -57,11 +58,11 @@ public class TestGraphJobMessage extends
public List<GraphJobMessage> getMessages() throws IOException {
GraphJobMessage mapMsg = new GraphJobMessage(new MapWritable());
GraphJobMessage vertexMsg1 = new GraphJobMessage(new Text("1"),
- GraphJobRunner.serialize(new IntWritable()));
+ WritableUtils.serialize(new IntWritable()));
GraphJobMessage vertexMsg2 = new GraphJobMessage(new Text("2"),
- GraphJobRunner.serialize(new IntWritable()));
+ WritableUtils.serialize(new IntWritable()));
GraphJobMessage vertexMsg3 = new GraphJobMessage(new Text("3"),
- GraphJobRunner.serialize(new IntWritable()));
+ WritableUtils.serialize(new IntWritable()));
return Lists.newArrayList(mapMsg, vertexMsg1, vertexMsg2, vertexMsg3);
}
Modified: hama/trunk/pom.xml
URL:
http://svn.apache.org/viewvc/hama/trunk/pom.xml?rev=1676879&r1=1676878&r2=1676879&view=diff
==============================================================================
--- hama/trunk/pom.xml (original)
+++ hama/trunk/pom.xml Thu Apr 30 00:11:23 2015
@@ -103,7 +103,6 @@
<log4j.version>1.2.16</log4j.version>
<zookeeper.version>3.4.5</zookeeper.version>
<ant.version>1.7.1</ant.version>
- <kryo.version>2.20</kryo.version>
</properties>
<repositories>
@@ -224,11 +223,6 @@
<artifactId>snappy-java</artifactId>
<version>1.0.5</version>
</dependency>
- <dependency>
- <groupId>com.esotericsoftware.kryo</groupId>
- <artifactId>kryo</artifactId>
- <version>${kryo.version}</version>
- </dependency>
<dependency>
<groupId>commons-io</groupId>
<artifactId>commons-io</artifactId>