Author: edwardyoon
Date: Thu Nov 8 02:20:13 2012
New Revision: 1406895
URL: http://svn.apache.org/viewvc?rev=1406895&view=rev
Log:
Add getApproximateSize to BSPMessageBundle
Removed:
hama/trunk/core/src/main/java/org/apache/hama/util/CompressionUtil.java
Modified:
hama/trunk/CHANGES.txt
hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPMessageBundle.java
hama/trunk/core/src/main/java/org/apache/hama/bsp/message/HadoopMessageManagerImpl.java
hama/trunk/core/src/test/java/org/apache/hama/bsp/TestBSPMessageBundle.java
Modified: hama/trunk/CHANGES.txt
URL:
http://svn.apache.org/viewvc/hama/trunk/CHANGES.txt?rev=1406895&r1=1406894&r2=1406895&view=diff
==============================================================================
--- hama/trunk/CHANGES.txt (original)
+++ hama/trunk/CHANGES.txt Thu Nov 8 02:20:13 2012
@@ -19,6 +19,7 @@ Release 0.6 (unreleased changes)
IMPROVEMENTS
+ HAMA-644: Add getApproximateSize to BSPMessageBundle (edwardyoon)
HAMA-655: Add Exception handling for parsing of vertex (edwardyoon)
HAMA-646: Add deleting temporary files method in TestSubmitGraphJob
(Yuesheng Hu via edwardyoon)
HAMA-597: Split a GraphJobRunner into multiple classes (edwardyoon &
tjungblut)
Modified:
hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPMessageBundle.java
URL:
http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPMessageBundle.java?rev=1406895&r1=1406894&r2=1406895&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPMessageBundle.java
(original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPMessageBundle.java Thu
Nov 8 02:20:13 2012
@@ -17,13 +17,16 @@
*/
package org.apache.hama.bsp;
+import java.io.ByteArrayOutputStream;
import java.io.DataInput;
import java.io.DataOutput;
+import java.io.DataOutputStream;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
+import java.util.Map;
import java.util.Map.Entry;
import org.apache.commons.logging.Log;
@@ -58,6 +61,7 @@ public class BSPMessageBundle<M extends
LinkedList<M> list = new LinkedList<M>();
list.add(message);
messages.put(className, list);
+ list = null;
} else {
messages.get(className).add(message);
}
@@ -73,6 +77,42 @@ public class BSPMessageBundle<M extends
return mergeList;
}
+ /**
+ * @return the approximate size of bundle object
+ * @throws IOException
+ */
+ public long getApproximateSize() throws IOException {
+ int sample = 20;
+ int sum = 0;
+ int totalMsgs = 0;
+ int classNames = 0;
+ DataOutputStream dos = null;
+
+ for (Map.Entry<String, LinkedList<M>> e : messages.entrySet()) {
+ classNames += e.getKey().length();
+ LinkedList<M> c = e.getValue();
+
+ if (messages.size() == 1 && c.size() < sample) {
+ dos = new DataOutputStream(new ByteArrayOutputStream());
+ write(dos);
+ dos.close();
+ return dos.size();
+ }
+
+ totalMsgs += c.size();
+ for (int i = 0; i < sample; i++) {
+ int idx = (int) (Math.random() * (c.size() - 1));
+ dos = new DataOutputStream(new ByteArrayOutputStream());
+ c.get(idx).write(dos);
+ dos.close();
+ sum += dos.size();
+ }
+ }
+
+ int avgSize = sum / (sample * messages.size());
+ return (totalMsgs * avgSize) + classNames + 4;
+ }
+
@Override
public void write(DataOutput out) throws IOException {
// writes the k/v mapping size
Modified:
hama/trunk/core/src/main/java/org/apache/hama/bsp/message/HadoopMessageManagerImpl.java
URL:
http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/HadoopMessageManagerImpl.java?rev=1406895&r1=1406894&r2=1406895&view=diff
==============================================================================
---
hama/trunk/core/src/main/java/org/apache/hama/bsp/message/HadoopMessageManagerImpl.java
(original)
+++
hama/trunk/core/src/main/java/org/apache/hama/bsp/message/HadoopMessageManagerImpl.java
Thu Nov 8 02:20:13 2012
@@ -32,7 +32,6 @@ import org.apache.hama.bsp.BSPPeer;
import org.apache.hama.bsp.TaskAttemptID;
import org.apache.hama.bsp.message.compress.BSPCompressedBundle;
import org.apache.hama.ipc.HamaRPCProtocolVersion;
-import org.apache.hama.util.CompressionUtil;
import org.apache.hama.util.LRUCache;
/**
@@ -102,18 +101,12 @@ public final class HadoopMessageManagerI
+ " to transfer messages to!");
} else {
if (compressor != null) {
- float bundleSize = CompressionUtil.getBundleSize(bundle);
- if (bundleSize > conf.getLong("hama.messenger.compression.threshold",
- 1048576)) {
+ if (bundle.getApproximateSize() > conf.getLong(
+ "hama.messenger.compression.threshold", 1048576)) {
BSPCompressedBundle compMsgBundle =
compressor.compressBundle(bundle);
- if (CompressionUtil.getCompressionRatio(
- (float) compMsgBundle.getData().length, bundleSize) < 1.0) {
- bspPeerConnection.put(compMsgBundle);
- } else {
- bspPeerConnection.put(bundle);
- }
+ bspPeerConnection.put(compMsgBundle);
} else {
- bspPeerConnection.put(bundle);
+ bspPeerConnection.put(bundle);
}
} else {
bspPeerConnection.put(bundle);
Modified:
hama/trunk/core/src/test/java/org/apache/hama/bsp/TestBSPMessageBundle.java
URL:
http://svn.apache.org/viewvc/hama/trunk/core/src/test/java/org/apache/hama/bsp/TestBSPMessageBundle.java?rev=1406895&r1=1406894&r2=1406895&view=diff
==============================================================================
--- hama/trunk/core/src/test/java/org/apache/hama/bsp/TestBSPMessageBundle.java
(original)
+++ hama/trunk/core/src/test/java/org/apache/hama/bsp/TestBSPMessageBundle.java
Thu Nov 8 02:20:13 2012
@@ -28,10 +28,26 @@ import java.util.Arrays;
import junit.framework.TestCase;
-import org.apache.hadoop.io.BytesWritable;;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.IntWritable;
public class TestBSPMessageBundle extends TestCase {
+ public void testApproximateSize() throws IOException {
+ BSPMessageBundle<IntWritable> bundle = new BSPMessageBundle<IntWritable>();
+ for (int i = 0; i < 100; i++) {
+ bundle.addMessage(new IntWritable(i));
+ }
+
+ assertTrue(bundle.getApproximateSize() > 400
+ && bundle.getApproximateSize() < 500);
+
+ bundle = new BSPMessageBundle<IntWritable>();
+ bundle.addMessage(new IntWritable(1));
+ assertTrue(bundle.getApproximateSize() > 40
+ && bundle.getApproximateSize() < 50);
+ }
+
public void testEmpty() throws IOException {
BSPMessageBundle<BytesWritable> bundle = new
BSPMessageBundle<BytesWritable>();
// Serialize it.