Author: edwardyoon
Date: Wed Aug 27 09:06:50 2014
New Revision: 1620823
URL: http://svn.apache.org/r1620823
Log:
HAMA-915: Add Kryo serializer (edwardyoon)
Added:
hama/trunk/core/src/main/java/org/apache/hama/util/KryoSerializer.java
(with props)
hama/trunk/core/src/test/java/org/apache/hama/util/TestKryoSerializer.java
(with props)
Modified:
hama/trunk/CHANGES.txt
hama/trunk/core/pom.xml
hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPMessageBundle.java
hama/trunk/examples/src/test/java/org/apache/hama/examples/PageRankTest.java
hama/trunk/graph/src/main/java/org/apache/hama/graph/ListVerticesInfo.java
hama/trunk/pom.xml
Modified: hama/trunk/CHANGES.txt
URL:
http://svn.apache.org/viewvc/hama/trunk/CHANGES.txt?rev=1620823&r1=1620822&r2=1620823&view=diff
==============================================================================
--- hama/trunk/CHANGES.txt (original)
+++ hama/trunk/CHANGES.txt Wed Aug 27 09:06:50 2014
@@ -4,6 +4,7 @@ Release 0.7.0 (unreleased changes)
NEW FEATURES
+ HAMA-915: Add Kryo serializer (edwardyoon)
HAMA-726: Hama on Mesos (Jeff Fenchel via edwardyoon)
HAMA-863: Implement SparseVector (Yexi Jiang)
Modified: hama/trunk/core/pom.xml
URL:
http://svn.apache.org/viewvc/hama/trunk/core/pom.xml?rev=1620823&r1=1620822&r2=1620823&view=diff
==============================================================================
--- hama/trunk/core/pom.xml (original)
+++ hama/trunk/core/pom.xml Wed Aug 27 09:06:50 2014
@@ -135,6 +135,10 @@
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
</dependency>
+ <dependency>
+ <groupId>com.esotericsoftware.kryo</groupId>
+ <artifactId>kryo</artifactId>
+ </dependency>
</dependencies>
<build>
Modified:
hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPMessageBundle.java
URL:
http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPMessageBundle.java?rev=1620823&r1=1620822&r2=1620823&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPMessageBundle.java
(original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPMessageBundle.java Wed
Aug 27 09:06:50 2014
@@ -47,36 +47,26 @@ public class BSPMessageBundle<M extends
private String className = null;
private int bundleSize = 0;
- private int bundleLength = 0;
- ByteArrayOutputStream byteBuffer = null;
- DataOutputStream bufferDos = null;
-
- ByteArrayInputStream bis = null;
- DataInputStream dis = null;
+ private final ByteArrayOutputStream byteBuffer = new ByteArrayOutputStream();
+ private final DataOutputStream bufferDos = new DataOutputStream(byteBuffer);
public BSPMessageBundle() {
- byteBuffer = new ByteArrayOutputStream();
- bufferDos = new DataOutputStream(byteBuffer);
-
bundleSize = 0;
- bundleLength = 0;
}
- ByteArrayOutputStream mbos = null;
- DataOutputStream mdos = null;
+ ByteArrayOutputStream mbos = new ByteArrayOutputStream();
+ DataOutputStream mdos = new DataOutputStream(mbos);
ByteArrayInputStream mbis = null;
DataInputStream mdis = null;
public byte[] serialize(M message) throws IOException {
- mbos = new ByteArrayOutputStream();
- mdos = new DataOutputStream(mbos);
+ mbos.reset();
message.write(mdos);
return mbos.toByteArray();
}
- private byte[] compressed;
- private byte[] serialized;
+ private byte[] msgBytes;
/**
* Add message to this bundle.
@@ -85,46 +75,42 @@ public class BSPMessageBundle<M extends
*/
public void addMessage(M message) {
try {
- serialized = serialize(message);
+ if (className == null) {
+ className = message.getClass().getName();
+ }
+
+ msgBytes = serialize(message);
if (compressor != null) {
- if (serialized.length > threshold) {
+ if (msgBytes.length > threshold) {
bufferDos.writeBoolean(true);
- compressed = compressor.compress(serialized);
- bufferDos.writeInt(compressed.length);
- bufferDos.write(compressed);
- bundleLength += compressed.length;
+ msgBytes = compressor.compress(msgBytes);
+ bufferDos.writeInt(msgBytes.length);
} else {
bufferDos.writeBoolean(false);
- bufferDos.write(serialized);
- bundleLength += serialized.length;
}
- } else {
- bufferDos.write(serialized);
- bundleLength += serialized.length;
}
+ bufferDos.write(msgBytes);
+ bundleSize++;
} catch (IOException e) {
- // TODO Auto-generated catch block
- e.printStackTrace();
+ LOG.error(e);
}
-
- if (className == null) {
- className = message.getClass().getName();
- }
- bundleSize++;
}
public byte[] getBuffer() {
return byteBuffer.toByteArray();
}
+ private ByteArrayInputStream bis = null;
+ private DataInputStream dis = null;
+
public Iterator<M> iterator() {
bis = new ByteArrayInputStream(byteBuffer.toByteArray());
dis = new DataInputStream(bis);
Iterator<M> it = new Iterator<M>() {
+ Class<M> clazz = null;
M msg;
- byte[] decompressed;
@Override
public boolean hasNext() {
@@ -142,43 +128,34 @@ public class BSPMessageBundle<M extends
@SuppressWarnings("unchecked")
@Override
public M next() {
- boolean isCompressed = false;
-
- if (compressor != null) {
- try {
- isCompressed = dis.readBoolean();
- } catch (IOException e1) {
- e1.printStackTrace();
+ try {
+ if (clazz == null) {
+ clazz = (Class<M>) Class.forName(className);
}
- }
- Class<M> clazz = null;
- try {
- clazz = (Class<M>) Class.forName(className);
- } catch (ClassNotFoundException e) {
- LOG.error("Class was not found.", e);
- }
+ msg = ReflectionUtils.newInstance(clazz, null);
+ boolean isCompressed = false;
- msg = ReflectionUtils.newInstance(clazz, null);
+ if (compressor != null) {
+ isCompressed = dis.readBoolean();
+ }
- try {
if (isCompressed) {
- // LOG.debug(">>>>> decompressing .........");
int length = dis.readInt();
- compressed = new byte[length];
- dis.readFully(compressed);
- decompressed = compressor.decompress(compressed);
+ msgBytes = new byte[length];
+ dis.readFully(msgBytes);
- mbis = new ByteArrayInputStream(decompressed);
+ mbis = new ByteArrayInputStream(compressor.decompress(msgBytes));
mdis = new DataInputStream(mbis);
msg.readFields(mdis);
} else {
msg.readFields(dis);
}
- } catch (IOException e) {
- // TODO Auto-generated catch block
- e.printStackTrace();
+ } catch (IOException ie) {
+ LOG.error(ie);
+ } catch (ClassNotFoundException ce) {
+ LOG.error("Class was not found.", ce);
}
return msg;
@@ -206,7 +183,7 @@ public class BSPMessageBundle<M extends
* @throws IOException
*/
public long getLength() {
- return bundleLength;
+ return byteBuffer.size();
}
@Override
@@ -214,9 +191,8 @@ public class BSPMessageBundle<M extends
out.writeInt(bundleSize);
if (bundleSize > 0) {
out.writeUTF(className);
- byte[] messages = byteBuffer.toByteArray();
- out.writeInt(messages.length);
- out.write(messages);
+ out.writeInt(byteBuffer.size());
+ out.write(byteBuffer.toByteArray());
}
}
Added: hama/trunk/core/src/main/java/org/apache/hama/util/KryoSerializer.java
URL:
http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/util/KryoSerializer.java?rev=1620823&view=auto
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/util/KryoSerializer.java
(added)
+++ hama/trunk/core/src/main/java/org/apache/hama/util/KryoSerializer.java Wed
Aug 27 09:06:50 2014
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.util;
+
+import java.io.IOException;
+
+import com.esotericsoftware.kryo.Kryo;
+import com.esotericsoftware.kryo.io.Input;
+import com.esotericsoftware.kryo.io.Output;
+
+public class KryoSerializer {
+ public static final int BUFFER_SIZE = 1024;
+
+ private final Kryo kryo;
+ private final Class<?> clazz;
+ private final byte[] buffer = new byte[BUFFER_SIZE];
+ private final Output output = new Output(buffer, -1);
+ private final Input input = new Input(buffer);
+
+ public KryoSerializer(Class<?> clazz) {
+ kryo = new Kryo();
+ kryo.setReferences(false);
+ kryo.register(clazz);
+ this.clazz = clazz;
+ }
+
+ public byte[] serialize(Object obj) throws IOException {
+ output.setBuffer(buffer, -1);
+ kryo.writeObject(output, obj);
+ return output.toBytes();
+ }
+
+ public Object deserialize(byte[] bytes) throws IOException {
+ input.setBuffer(bytes);
+ return kryo.readObject(input, clazz);
+ }
+
+}
Propchange:
hama/trunk/core/src/main/java/org/apache/hama/util/KryoSerializer.java
------------------------------------------------------------------------------
svn:eol-style = native
Added:
hama/trunk/core/src/test/java/org/apache/hama/util/TestKryoSerializer.java
URL:
http://svn.apache.org/viewvc/hama/trunk/core/src/test/java/org/apache/hama/util/TestKryoSerializer.java?rev=1620823&view=auto
==============================================================================
--- hama/trunk/core/src/test/java/org/apache/hama/util/TestKryoSerializer.java
(added)
+++ hama/trunk/core/src/test/java/org/apache/hama/util/TestKryoSerializer.java
Wed Aug 27 09:06:50 2014
@@ -0,0 +1,50 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.util;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+
+import junit.framework.TestCase;
+
+import org.apache.hadoop.io.DoubleWritable;
+
+public class TestKryoSerializer extends TestCase {
+
+ ByteArrayOutputStream mbos = null;
+ DataOutputStream mdos = null;
+ ByteArrayInputStream mbis = null;
+ DataInputStream mdis = null;
+
+ public void testSerialization() throws Exception {
+ KryoSerializer k = new KryoSerializer(DoubleWritable.class);
+
+ long startTime = System.currentTimeMillis();
+ for (int i = 0; i < 10000000; i++) {
+ DoubleWritable x = new DoubleWritable(i + 0.2);
+ byte[] bytes = k.serialize(x);
+ DoubleWritable y = (DoubleWritable) k.deserialize(bytes);
+ assertEquals(x, y);
+ }
+ System.out.println("Finished in "
+ + (System.currentTimeMillis() - startTime) / 1000.0 + " seconds");
+ }
+
+}
Propchange:
hama/trunk/core/src/test/java/org/apache/hama/util/TestKryoSerializer.java
------------------------------------------------------------------------------
svn:eol-style = native
Modified:
hama/trunk/examples/src/test/java/org/apache/hama/examples/PageRankTest.java
URL:
http://svn.apache.org/viewvc/hama/trunk/examples/src/test/java/org/apache/hama/examples/PageRankTest.java?rev=1620823&r1=1620822&r2=1620823&view=diff
==============================================================================
---
hama/trunk/examples/src/test/java/org/apache/hama/examples/PageRankTest.java
(original)
+++
hama/trunk/examples/src/test/java/org/apache/hama/examples/PageRankTest.java
Wed Aug 27 09:06:50 2014
@@ -79,7 +79,7 @@ public class PageRankTest extends TestCa
private void generateTestData() {
try {
- FastGraphGen.main(new String[] { "400", "10", INPUT, "2" });
+ FastGraphGen.main(new String[] { "4000", "100", INPUT, "3" });
} catch (Exception e) {
e.printStackTrace();
}
Modified:
hama/trunk/graph/src/main/java/org/apache/hama/graph/ListVerticesInfo.java
URL:
http://svn.apache.org/viewvc/hama/trunk/graph/src/main/java/org/apache/hama/graph/ListVerticesInfo.java?rev=1620823&r1=1620822&r2=1620823&view=diff
==============================================================================
--- hama/trunk/graph/src/main/java/org/apache/hama/graph/ListVerticesInfo.java
(original)
+++ hama/trunk/graph/src/main/java/org/apache/hama/graph/ListVerticesInfo.java
Wed Aug 27 09:06:50 2014
@@ -19,10 +19,6 @@ package org.apache.hama.graph;
import static com.google.common.base.Preconditions.checkArgument;
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
@@ -32,6 +28,7 @@ import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hama.HamaConfiguration;
import org.apache.hama.bsp.TaskAttemptID;
+import org.apache.hama.util.KryoSerializer;
/**
* Stores the serialized vertices into a memory-based list. It doesn't allow
@@ -50,11 +47,6 @@ public final class ListVerticesInfo<V ex
private boolean lockedAdditions = false;
private int index = 0;
- private ByteArrayOutputStream bos = null;
- private DataOutputStream dos = null;
- private ByteArrayInputStream bis = null;
- private DataInputStream dis = null;
-
@Override
public void init(GraphJobRunner<V, E, M> runner, HamaConfiguration conf,
TaskAttemptID attempt) throws IOException {
@@ -130,19 +122,15 @@ public final class ListVerticesInfo<V ex
};
}
+ private final KryoSerializer kryo = new
KryoSerializer(GraphJobRunner.VERTEX_CLASS);
+
public byte[] serialize(Vertex<V, E, M> vertex) throws IOException {
- bos = new ByteArrayOutputStream();
- dos = new DataOutputStream(bos);
- vertex.write(dos);
- return bos.toByteArray();
+ return kryo.serialize(vertex);
}
+ @SuppressWarnings("unchecked")
public Vertex<V, E, M> deserialize(byte[] serialized) throws IOException {
- bis = new ByteArrayInputStream(serialized);
- dis = new DataInputStream(bis);
- v = GraphJobRunner.<V, E, M>
newVertexInstance(GraphJobRunner.VERTEX_CLASS);
-
- v.readFields(dis);
+ v = (Vertex<V, E, M>) kryo.deserialize(serialized);
v.setRunner(runner);
return v;
}
Modified: hama/trunk/pom.xml
URL:
http://svn.apache.org/viewvc/hama/trunk/pom.xml?rev=1620823&r1=1620822&r2=1620823&view=diff
==============================================================================
--- hama/trunk/pom.xml (original)
+++ hama/trunk/pom.xml Wed Aug 27 09:06:50 2014
@@ -101,6 +101,7 @@
<log4j.version>1.2.16</log4j.version>
<zookeeper.version>3.4.5</zookeeper.version>
<ant.version>1.7.1</ant.version>
+ <kryo.version>2.20</kryo.version>
</properties>
<repositories>
@@ -193,9 +194,14 @@
<dependencyManagement>
<dependencies>
<dependency>
- <groupId>org.xerial.snappy</groupId>
- <artifactId>snappy-java</artifactId>
- <version>1.0.5</version>
+ <groupId>org.xerial.snappy</groupId>
+ <artifactId>snappy-java</artifactId>
+ <version>1.0.5</version>
+ </dependency>
+ <dependency>
+ <groupId>com.esotericsoftware.kryo</groupId>
+ <artifactId>kryo</artifactId>
+ <version>${kryo.version}</version>
</dependency>
<dependency>
<groupId>com.google.guava</groupId>