Author: edwardyoon
Date: Fri Mar 13 03:00:42 2015
New Revision: 1666350
URL: http://svn.apache.org/r1666350
Log:
HAMA-932: Use of Kryo serialization
Modified:
hama/trunk/commons/src/main/java/org/apache/hama/commons/math/DenseDoubleMatrix.java
hama/trunk/commons/src/main/java/org/apache/hama/commons/math/DenseDoubleVector.java
hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPMessageBundle.java
hama/trunk/core/src/test/java/org/apache/hama/util/TestKryoSerializer.java
hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobMessage.java
hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java
hama/trunk/graph/src/main/java/org/apache/hama/graph/OutgoingVertexMessageManager.java
Modified:
hama/trunk/commons/src/main/java/org/apache/hama/commons/math/DenseDoubleMatrix.java
URL:
http://svn.apache.org/viewvc/hama/trunk/commons/src/main/java/org/apache/hama/commons/math/DenseDoubleMatrix.java?rev=1666350&r1=1666349&r2=1666350&view=diff
==============================================================================
---
hama/trunk/commons/src/main/java/org/apache/hama/commons/math/DenseDoubleMatrix.java
(original)
+++
hama/trunk/commons/src/main/java/org/apache/hama/commons/math/DenseDoubleMatrix.java
Fri Mar 13 03:00:42 2015
@@ -29,10 +29,12 @@ import com.google.common.base.Preconditi
*/
public final class DenseDoubleMatrix implements DoubleMatrix {
- protected final double[][] matrix;
- protected final int numRows;
- protected final int numColumns;
+ protected double[][] matrix;
+ protected int numRows;
+ protected int numColumns;
+ public DenseDoubleMatrix() { }
+
/**
* Creates a new empty matrix from the rows and columns.
*
Modified:
hama/trunk/commons/src/main/java/org/apache/hama/commons/math/DenseDoubleVector.java
URL:
http://svn.apache.org/viewvc/hama/trunk/commons/src/main/java/org/apache/hama/commons/math/DenseDoubleVector.java?rev=1666350&r1=1666349&r2=1666350&view=diff
==============================================================================
---
hama/trunk/commons/src/main/java/org/apache/hama/commons/math/DenseDoubleVector.java
(original)
+++
hama/trunk/commons/src/main/java/org/apache/hama/commons/math/DenseDoubleVector.java
Fri Mar 13 03:00:42 2015
@@ -33,8 +33,10 @@ import com.google.common.collect.Abstrac
*/
public final class DenseDoubleVector implements DoubleVector {
- private final double[] vector;
+ private double[] vector;
+ public DenseDoubleVector() { }
+
/**
* Creates a new vector with the given length.
*/
Modified:
hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPMessageBundle.java
URL:
http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPMessageBundle.java?rev=1666350&r1=1666349&r2=1666350&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPMessageBundle.java
(original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPMessageBundle.java Fri
Mar 13 03:00:42 2015
@@ -20,16 +20,17 @@ package org.apache.hama.bsp;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.DataInput;
-import java.io.DataInputStream;
import java.io.DataOutput;
-import java.io.DataOutputStream;
import java.io.IOException;
import java.util.Iterator;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.util.ReflectionUtils;
+
+import com.esotericsoftware.kryo.Kryo;
+import com.esotericsoftware.kryo.io.Input;
+import com.esotericsoftware.kryo.io.Output;
/**
* BSPMessageBundle stores a group of messages so that they can be sent in
batch
@@ -44,8 +45,9 @@ public class BSPMessageBundle<M extends
private String className = null;
private int bundleSize = 0;
- private final ByteArrayOutputStream byteBuffer = new ByteArrayOutputStream();
- private final DataOutputStream bufferDos = new DataOutputStream(byteBuffer);
+ private Kryo kryo = new Kryo();
+ private ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
+ private Output output = new Output(outputStream, 4096);
public BSPMessageBundle() {
bundleSize = 0;
@@ -57,42 +59,37 @@ public class BSPMessageBundle<M extends
* @param message BSPMessage to add.
*/
public void addMessage(M message) {
- try {
- if (className == null) {
- className = message.getClass().getName();
- }
-
- message.write(bufferDos);
- bundleSize++;
- } catch (IOException e) {
- LOG.error(e);
+ if (className == null) {
+ className = message.getClass().getName();
+ kryo.register(message.getClass());
}
+
+ kryo.writeObject(output, message);
+ output.flush();
+
+ bundleSize++;
}
public byte[] getBuffer() {
- return byteBuffer.toByteArray();
+ return outputStream.toByteArray();
}
private ByteArrayInputStream bis = null;
- private DataInputStream dis = null;
+ private Input in = null;
public Iterator<M> iterator() {
- bis = new ByteArrayInputStream(byteBuffer.toByteArray());
- dis = new DataInputStream(bis);
+ bis = new ByteArrayInputStream(outputStream.toByteArray());
+ in = new Input(bis, 4096);
Iterator<M> it = new Iterator<M>() {
Class<M> clazz = null;
- M msg;
+ int counter = 0;
@Override
public boolean hasNext() {
- try {
- if (dis.available() > 0) {
- return true;
- } else {
- return false;
- }
- } catch (IOException e) {
+ if ((bundleSize - counter) > 0) {
+ return true;
+ } else {
return false;
}
}
@@ -104,17 +101,13 @@ public class BSPMessageBundle<M extends
if (clazz == null) {
clazz = (Class<M>) Class.forName(className);
}
-
- msg = ReflectionUtils.newInstance(clazz, null);
- msg.readFields(dis);
-
- } catch (IOException ie) {
- LOG.error(ie);
} catch (ClassNotFoundException ce) {
LOG.error("Class was not found.", ce);
}
- return msg;
+ counter++;
+
+ return kryo.readObject(in, clazz);
}
@Override
@@ -134,7 +127,7 @@ public class BSPMessageBundle<M extends
* @throws IOException
*/
public long getLength() {
- return byteBuffer.size();
+ return outputStream.size();
}
@Override
@@ -142,8 +135,8 @@ public class BSPMessageBundle<M extends
out.writeInt(bundleSize);
if (bundleSize > 0) {
out.writeUTF(className);
- out.writeInt(byteBuffer.size());
- out.write(byteBuffer.toByteArray());
+ out.writeInt(outputStream.size());
+ out.write(outputStream.toByteArray());
}
}
@@ -156,7 +149,7 @@ public class BSPMessageBundle<M extends
int bytesLength = in.readInt();
byte[] temp = new byte[bytesLength];
in.readFully(temp);
- bufferDos.write(temp);
+ outputStream.write(temp);
}
}
Modified:
hama/trunk/core/src/test/java/org/apache/hama/util/TestKryoSerializer.java
URL:
http://svn.apache.org/viewvc/hama/trunk/core/src/test/java/org/apache/hama/util/TestKryoSerializer.java?rev=1666350&r1=1666349&r2=1666350&view=diff
==============================================================================
--- hama/trunk/core/src/test/java/org/apache/hama/util/TestKryoSerializer.java
(original)
+++ hama/trunk/core/src/test/java/org/apache/hama/util/TestKryoSerializer.java
Fri Mar 13 03:00:42 2015
@@ -19,32 +19,39 @@ package org.apache.hama.util;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
import junit.framework.TestCase;
import org.apache.hadoop.io.DoubleWritable;
-public class TestKryoSerializer extends TestCase {
+import com.esotericsoftware.kryo.Kryo;
+import com.esotericsoftware.kryo.io.Input;
+import com.esotericsoftware.kryo.io.Output;
- ByteArrayOutputStream mbos = null;
- DataOutputStream mdos = null;
- ByteArrayInputStream mbis = null;
- DataInputStream mdis = null;
+public class TestKryoSerializer extends TestCase {
public void testSerialization() throws Exception {
- KryoSerializer k = new KryoSerializer(DoubleWritable.class);
+ Kryo kryo = new Kryo();
+ kryo.register(DoubleWritable.class);
+
+ ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
+ Output out = new Output(outputStream, 4096);
+
+ for (int i = 0; i < 10; i++) {
+ DoubleWritable a = new DoubleWritable(i + 0.123);
+ kryo.writeClassAndObject(out, a);
+ out.flush();
+ }
+
+ System.out.println(outputStream.size());
- long startTime = System.currentTimeMillis();
- for (int i = 0; i < 10000000; i++) {
- DoubleWritable x = new DoubleWritable(i + 0.2);
- byte[] bytes = k.serialize(x);
- DoubleWritable y = (DoubleWritable) k.deserialize(bytes);
- assertEquals(x, y);
+ ByteArrayInputStream bin = new
ByteArrayInputStream(outputStream.toByteArray());
+ Input in = new Input(bin, 4096);
+
+ for (int i = 0; i < 10; i++) {
+ DoubleWritable b = (DoubleWritable) kryo.readClassAndObject(in);
+ System.out.println(bin.available() + ", " + b);
}
- System.out.println("Finished in "
- + (System.currentTimeMillis() - startTime) / 1000.0 + " seconds");
}
}
Modified:
hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobMessage.java
URL:
http://svn.apache.org/viewvc/hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobMessage.java?rev=1666350&r1=1666349&r2=1666350&view=diff
==============================================================================
--- hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobMessage.java
(original)
+++ hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobMessage.java
Fri Mar 13 03:00:42 2015
@@ -17,11 +17,14 @@
*/
package org.apache.hama.graph;
+import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.DataInput;
+import java.io.DataInputStream;
import java.io.DataOutput;
import java.io.DataOutputStream;
import java.io.IOException;
+import java.util.Iterator;
import java.util.List;
import org.apache.hadoop.io.DataInputBuffer;
@@ -55,7 +58,6 @@ public final class GraphJobMessage imple
private int numOfValues = 0;
private final ByteArrayOutputStream byteBuffer = new ByteArrayOutputStream();
- private final DataOutputStream bufferDos = new DataOutputStream(byteBuffer);
static {
if (comparator == null) {
@@ -92,7 +94,7 @@ public final class GraphJobMessage imple
this.flag = VERTEX_FLAG;
this.vertexId = vertexID;
try {
- this.bufferDos.write(valuesBytes);
+ this.byteBuffer.write(valuesBytes);
} catch (IOException e) {
e.printStackTrace();
}
@@ -114,7 +116,7 @@ public final class GraphJobMessage imple
public void addValuesBytes(byte[] values, int numOfValues) {
try {
- bufferDos.write(values);
+ byteBuffer.write(values);
this.numOfValues += numOfValues;
} catch (IOException e) {
// TODO Auto-generated catch block
@@ -124,10 +126,12 @@ public final class GraphJobMessage imple
public void add(Writable value) {
try {
- value.write(bufferDos);
+ ByteArrayOutputStream a = new ByteArrayOutputStream();
+ DataOutputStream b = new DataOutputStream(a);
+ value.write(b);
+ byteBuffer.write(a.toByteArray());
numOfValues++;
} catch (IOException e) {
- // TODO Auto-generated catch block
e.printStackTrace();
}
}
@@ -199,7 +203,7 @@ public final class GraphJobMessage imple
int bytesLength = in.readInt();
byte[] temp = new byte[bytesLength];
in.readFully(temp);
- bufferDos.write(temp);
+ byteBuffer.write(temp);
} else if (isMapMessage()) {
map = new MapWritable();
map.readFields(in);
@@ -304,4 +308,41 @@ public final class GraphJobMessage imple
}
}
+
+ public Iterable<Writable> getIterableMessages() {
+
+ return new Iterable<Writable>() {
+ @Override
+ public Iterator<Writable> iterator() {
+ return new Iterator<Writable>() {
+ ByteArrayInputStream bis = new
ByteArrayInputStream(byteBuffer.toByteArray());
+ DataInputStream dis = new DataInputStream(bis);
+ int index = 0;
+
+ @Override
+ public boolean hasNext() {
+ return (index < numOfValues) ? true : false;
+ }
+
+ @Override
+ public Writable next() {
+ Writable v = GraphJobRunner.createVertexValue();
+ try {
+ v.readFields(dis);
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ index++;
+ return v;
+ }
+
+ @Override
+ public void remove() {
+ }
+ };
+ }
+ };
+ }
+
+
}
Modified:
hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java
URL:
http://svn.apache.org/viewvc/hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java?rev=1666350&r1=1666349&r2=1666350&view=diff
==============================================================================
--- hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java
(original)
+++ hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java
Fri Mar 13 03:00:42 2015
@@ -17,8 +17,6 @@
*/
package org.apache.hama.graph;
-import java.io.ByteArrayInputStream;
-import java.io.DataInputStream;
import java.io.IOException;
import java.util.Collections;
import java.util.HashSet;
@@ -231,23 +229,18 @@ public final class GraphJobRunner<V exte
notComputedVertices = new HashSet();
notComputedVertices.addAll(vertices.keySet());
- Iterable<Writable> msgs = null;
Vertex<V, E, M> vertex = null;
while (currentMessage != null) {
vertex = vertices.get((V) currentMessage.getVertexId());
- final int numOfValues = currentMessage.getNumOfValues();
- final byte[] serializedMsgs = currentMessage.getValuesBytes();
- msgs = getIterableMessages(numOfValues, serializedMsgs);
-
// reactivation
if (vertex.isHalted()) {
vertex.setActive();
}
if (!vertex.isHalted()) {
- vertex.compute((Iterable<M>) msgs);
+ vertex.compute((Iterable<M>) currentMessage.getIterableMessages());
vertices.finishVertexComputation(vertex);
activeVertices++;
@@ -650,42 +643,6 @@ public final class GraphJobRunner<V exte
return (X) ReflectionUtils.newInstance(EDGE_VALUE_CLASS);
}
- public static Iterable<Writable> getIterableMessages(final int numOfValues,
- final byte[] msgBytes) {
-
- return new Iterable<Writable>() {
- @Override
- public Iterator<Writable> iterator() {
- return new Iterator<Writable>() {
- ByteArrayInputStream bis = new ByteArrayInputStream(msgBytes);
- DataInputStream dis = new DataInputStream(bis);
- int index = 0;
-
- @Override
- public boolean hasNext() {
- return (index < numOfValues) ? true : false;
- }
-
- @Override
- public Writable next() {
- Writable v = GraphJobRunner.createVertexValue();
- try {
- v.readFields(dis);
- } catch (IOException e) {
- e.printStackTrace();
- }
- index++;
- return v;
- }
-
- @Override
- public void remove() {
- }
- };
- }
- };
- }
-
public int getChangedVertexCnt() {
return changedVertexCnt;
}
Modified:
hama/trunk/graph/src/main/java/org/apache/hama/graph/OutgoingVertexMessageManager.java
URL:
http://svn.apache.org/viewvc/hama/trunk/graph/src/main/java/org/apache/hama/graph/OutgoingVertexMessageManager.java?rev=1666350&r1=1666349&r2=1666350&view=diff
==============================================================================
---
hama/trunk/graph/src/main/java/org/apache/hama/graph/OutgoingVertexMessageManager.java
(original)
+++
hama/trunk/graph/src/main/java/org/apache/hama/graph/OutgoingVertexMessageManager.java
Fri Mar 13 03:00:42 2015
@@ -39,7 +39,6 @@ public class OutgoingVertexMessageManage
.getLog(OutgoingVertexMessageManager.class);
private Combiner<Writable> combiner;
- private Iterable<Writable> msgs;
private HashMap<InetSocketAddress, MessagePerVertex> storage = new
HashMap<InetSocketAddress, MessagePerVertex>();
@SuppressWarnings("unchecked")
@@ -50,7 +49,8 @@ public class OutgoingVertexMessageManage
final String combinerName = conf.get(Constants.COMBINER_CLASS);
if (combinerName != null) {
try {
- combiner = (Combiner<Writable>)
ReflectionUtils.newInstance(combinerName);
+ combiner = (Combiner<Writable>) ReflectionUtils
+ .newInstance(combinerName);
} catch (ClassNotFoundException e) {
e.printStackTrace();
}
@@ -73,13 +73,11 @@ public class OutgoingVertexMessageManage
// Combining messages
if (combiner != null && msgPerVertex.get(vertexID).getNumOfValues() > 1)
{
- final int numOfValues = msgPerVertex.get(vertexID).getNumOfValues();
- final byte[] msgBytes = msgPerVertex.get(vertexID).getValuesBytes();
- msgs = GraphJobRunner.getIterableMessages(numOfValues, msgBytes);
-
// Overwrite
- storage.get(targetPeerAddress).put(vertexID,
- new GraphJobMessage(vertexID, combiner.combine(msgs)));
+ storage.get(targetPeerAddress).put(
+ vertexID,
+ new GraphJobMessage(vertexID, combiner.combine(msgPerVertex.get(
+ vertexID).getIterableMessages())));
}
} else {
outgoingBundles.get(targetPeerAddress).addMessage(msg);