Author: kennethxian
Date: Fri Jun 14 05:07:49 2013
New Revision: 1492955
URL: http://svn.apache.org/r1492955
Log:
HAMA-761: Improvement for GraphJobMessage comparation operation
Modified:
hama/trunk/CHANGES.txt
hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobMessage.java
Modified: hama/trunk/CHANGES.txt
URL:
http://svn.apache.org/viewvc/hama/trunk/CHANGES.txt?rev=1492955&r1=1492954&r2=1492955&view=diff
==============================================================================
--- hama/trunk/CHANGES.txt (original)
+++ hama/trunk/CHANGES.txt Fri Jun 14 05:07:49 2013
@@ -15,6 +15,7 @@ Release 0.7 (unreleased changes)
IMPROVEMENTS
+ HAMA-761: Improvement for GraphJobMessage comparation operation (MaoYuan
Xian)
HAMA-760: Add new features to existing Multi Layer Perceptron (Yexi Jiang
via edwardyoon)
HAMA-758: Send message to non-exist vertex makes the job fail (MaoYuan Xian
via edwardyoon)
HAMA-757: The partitioning job output should be un-splitable (MaoYuan Xian
via edwardyoon)
Modified:
hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobMessage.java
URL:
http://svn.apache.org/viewvc/hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobMessage.java?rev=1492955&r1=1492954&r2=1492955&view=diff
==============================================================================
--- hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobMessage.java
(original)
+++ hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobMessage.java
Fri Jun 14 05:07:49 2013
@@ -21,10 +21,12 @@ import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
+import org.apache.hadoop.io.DataInputBuffer;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.MapWritable;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.io.WritableComparator;
import org.apache.hadoop.util.ReflectionUtils;
/**
@@ -46,6 +48,15 @@ public final class GraphJobMessage imple
private WritableComparable vertexId;
private Writable vertexValue;
private IntWritable verticesSize;
+ private static GraphJobMessageComparator comparator;
+
+ static {
+ if (comparator == null) {
+ comparator = new GraphJobMessageComparator();
+ }
+
+ WritableComparator.define(GraphJobMessage.class, comparator);
+ }
public GraphJobMessage() {
}
@@ -84,6 +95,28 @@ public final class GraphJobMessage imple
}
+ public void fastReadFields(DataInput in) throws IOException {
+ flag = in.readByte();
+ if (isVertexMessage()) {
+ vertexId = GraphJobRunner.createVertexIDObject();
+ vertexId.readFields(in);
+ /*
+ * vertexValue = GraphJobRunner.createVertexValue();
+ * vertexValue.readFields(in);
+ */
+ } else if (isMapMessage()) {
+ map = new MapWritable();
+ map.readFields(in);
+ } else if (isVerticesSizeMessage()) {
+ verticesSize = new IntWritable();
+ verticesSize.readFields(in);
+ } else {
+ vertexId = ReflectionUtils.newInstance(GraphJobRunner.VERTEX_ID_CLASS,
+ null);
+ vertexId.readFields(in);
+ }
+ }
+
@Override
public void readFields(DataInput in) throws IOException {
flag = in.readByte();
@@ -162,4 +195,41 @@ public final class GraphJobMessage imple
}
}
+ public static class GraphJobMessageComparator extends WritableComparator {
+ private final DataInputBuffer buffer;
+ private final GraphJobMessage key1;
+ private final GraphJobMessage key2;
+
+ public GraphJobMessageComparator() {
+ this(GraphJobMessage.class);
+ }
+
+ protected GraphJobMessageComparator(
+ Class<? extends WritableComparable<?>> keyClass) {
+ this(keyClass, false);
+ }
+
+ protected GraphJobMessageComparator(
+ Class<? extends WritableComparable<?>> keyClass, boolean
createInstances) {
+ super(keyClass, createInstances);
+ key1 = new GraphJobMessage();
+ key2 = new GraphJobMessage();
+ buffer = new DataInputBuffer();
+ }
+
+ public synchronized int compare(byte[] b1, int s1, int l1, byte[] b2,
+ int s2, int l2) {
+ try {
+ buffer.reset(b1, s1, l1); // parse key1
+ key1.fastReadFields(buffer);
+
+ buffer.reset(b2, s2, l2); // parse key2
+ key2.fastReadFields(buffer);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+
+ return compare(key1, key2); // compare them
+ }
+ }
}