Author: edwardyoon
Date: Wed Sep 26 00:48:58 2012
New Revision: 1390242
URL: http://svn.apache.org/viewvc?rev=1390242&view=rev
Log:
Add compressor threshold.
Modified:
hama/trunk/conf/hama-default.xml
hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJob.java
hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPMessageBundle.java
hama/trunk/core/src/main/java/org/apache/hama/bsp/message/HadoopMessageManagerImpl.java
hama/trunk/core/src/main/java/org/apache/hama/bsp/message/MessageManagerFactory.java
hama/trunk/core/src/main/java/org/apache/hama/bsp/message/compress/BSPMessageCompressor.java
hama/trunk/core/src/main/java/org/apache/hama/bsp/message/compress/Bzip2Compressor.java
hama/trunk/core/src/main/java/org/apache/hama/bsp/message/compress/SnappyCompressor.java
hama/trunk/core/src/main/java/org/apache/hama/util/CompressionUtil.java
hama/trunk/core/src/test/java/org/apache/hama/bsp/TestBSPMasterGroomServer.java
Modified: hama/trunk/conf/hama-default.xml
URL:
http://svn.apache.org/viewvc/hama/trunk/conf/hama-default.xml?rev=1390242&r1=1390241&r2=1390242&view=diff
==============================================================================
--- hama/trunk/conf/hama-default.xml (original)
+++ hama/trunk/conf/hama-default.xml Wed Sep 26 00:48:58 2012
@@ -205,6 +205,17 @@
<name>hama.messenger.class</name>
<value>org.apache.hama.bsp.message.HadoopMessageManagerImpl</value>
</property>
+
+ <property>
+ <name>hama.messenger.compression.class</name>
+ <value>org.apache.hama.bsp.message.compress.SnappyCompressor</value>
+ <description>The message compression algorithm to choose.</description>
+ </property>
+ <property>
+ <name>hama.messenger.compression.threshold</name>
+ <value>1048576</value>
+ <description>The Compressor threshold sets the level at which compression
begins.</description>
+ </property>
<property>
<name>hama.zookeeper.quorum</name>
Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJob.java
URL:
http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJob.java?rev=1390242&r1=1390241&r2=1390242&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJob.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJob.java Wed Sep 26
00:48:58 2012
@@ -264,7 +264,8 @@ public class BSPJob extends BSPJobContex
/**
* Sets the compression codec that should be used to compress messages.
*/
- public void setCompressionCodec(Class<? extends BSPMessageCompressor<?>>
clazz) {
+ @SuppressWarnings({ "rawtypes" })
+ public void setCompressionCodec(Class<? extends BSPMessageCompressor> clazz)
{
conf.setClass(BSPMessageCompressorFactory.COMPRESSION_CODEC_CLASS, clazz,
BSPMessageCompressor.class);
}
@@ -396,4 +397,13 @@ public class BSPJob extends BSPJobContex
protected void setCheckPointFlag(boolean enableCheckPoint) {
conf.setBoolean(Constants.CHECKPOINT_ENABLED, enableCheckPoint);
}
+
+ /**
+ * Set compression threshold in bytes.
+ *
+ * @param ct
+ */
+ public void setCompressionThreshold(long ct) {
+ conf.setLong("hama.messenger.compression.threshold", ct);
+ }
}
Modified:
hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPMessageBundle.java
URL:
http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPMessageBundle.java?rev=1390242&r1=1390241&r2=1390242&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPMessageBundle.java
(original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPMessageBundle.java Wed
Sep 26 00:48:58 2012
@@ -17,8 +17,10 @@
*/
package org.apache.hama.bsp;
+import java.io.ByteArrayOutputStream;
import java.io.DataInput;
import java.io.DataOutput;
+import java.io.DataOutputStream;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
Modified:
hama/trunk/core/src/main/java/org/apache/hama/bsp/message/HadoopMessageManagerImpl.java
URL:
http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/HadoopMessageManagerImpl.java?rev=1390242&r1=1390241&r2=1390242&view=diff
==============================================================================
---
hama/trunk/core/src/main/java/org/apache/hama/bsp/message/HadoopMessageManagerImpl.java
(original)
+++
hama/trunk/core/src/main/java/org/apache/hama/bsp/message/HadoopMessageManagerImpl.java
Wed Sep 26 00:48:58 2012
@@ -96,13 +96,14 @@ public final class HadoopMessageManagerI
@Override
public final void transfer(InetSocketAddress addr, BSPMessageBundle<M>
bundle)
throws IOException {
-
HadoopMessageManager<M> bspPeerConnection =
this.getBSPPeerConnection(addr);
if (bspPeerConnection == null) {
throw new IllegalArgumentException("Can not find " + addr.toString()
+ " to transfer messages to!");
} else {
- if (compressor != null) {
+ if (compressor != null
+ && CompressionUtil.getBundleSize(bundle) > conf.getLong(
+ "hama.messenger.compression.threshold", 1048576)) {
BSPCompressedBundle compMsgBundle = compressor.compressBundle(bundle);
if (CompressionUtil.getCompressionRatio(compMsgBundle, bundle) < 1.0) {
bspPeerConnection.put(compMsgBundle);
Modified:
hama/trunk/core/src/main/java/org/apache/hama/bsp/message/MessageManagerFactory.java
URL:
http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/MessageManagerFactory.java?rev=1390242&r1=1390241&r2=1390242&view=diff
==============================================================================
---
hama/trunk/core/src/main/java/org/apache/hama/bsp/message/MessageManagerFactory.java
(original)
+++
hama/trunk/core/src/main/java/org/apache/hama/bsp/message/MessageManagerFactory.java
Wed Sep 26 00:48:58 2012
@@ -35,7 +35,7 @@ public class MessageManagerFactory {
Configuration conf) throws ClassNotFoundException {
return (MessageManager<M>) ReflectionUtils.newInstance(conf
.getClassByName(conf.get(MESSAGE_MANAGER_CLASS,
- org.apache.hama.bsp.message.AvroMessageManagerImpl.class
+ org.apache.hama.bsp.message.HadoopMessageManagerImpl.class
.getCanonicalName())), conf);
}
Modified:
hama/trunk/core/src/main/java/org/apache/hama/bsp/message/compress/BSPMessageCompressor.java
URL:
http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/compress/BSPMessageCompressor.java?rev=1390242&r1=1390241&r2=1390242&view=diff
==============================================================================
---
hama/trunk/core/src/main/java/org/apache/hama/bsp/message/compress/BSPMessageCompressor.java
(original)
+++
hama/trunk/core/src/main/java/org/apache/hama/bsp/message/compress/BSPMessageCompressor.java
Wed Sep 26 00:48:58 2012
@@ -26,7 +26,7 @@ import org.apache.hama.bsp.BSPMessageBun
* Provides utilities for compressing and decompressing BSPMessageBundle.
*
*/
-public interface BSPMessageCompressor<M extends Writable> {
+public abstract class BSPMessageCompressor<M extends Writable> {
public static final Log LOG = LogFactory.getLog(BSPMessageCompressor.class);
@@ -37,7 +37,7 @@ public interface BSPMessageCompressor<M
* @param bundle
* @return
*/
- public BSPCompressedBundle compressBundle(BSPMessageBundle<M> bundle);
+ public abstract BSPCompressedBundle compressBundle(BSPMessageBundle<M>
bundle);
/**
* Decompresses a BSPCompressedBundle and returns the corresponding
@@ -46,5 +46,5 @@ public interface BSPMessageCompressor<M
* @param compMsgBundle
* @return
*/
- public BSPMessageBundle<M> decompressBundle(BSPCompressedBundle
compMsgBundle);
+ public abstract BSPMessageBundle<M> decompressBundle(BSPCompressedBundle
compMsgBundle);
}
Modified:
hama/trunk/core/src/main/java/org/apache/hama/bsp/message/compress/Bzip2Compressor.java
URL:
http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/compress/Bzip2Compressor.java?rev=1390242&r1=1390241&r2=1390242&view=diff
==============================================================================
---
hama/trunk/core/src/main/java/org/apache/hama/bsp/message/compress/Bzip2Compressor.java
(original)
+++
hama/trunk/core/src/main/java/org/apache/hama/bsp/message/compress/Bzip2Compressor.java
Wed Sep 26 00:48:58 2012
@@ -29,7 +29,7 @@ import org.apache.hadoop.io.compress.Com
import org.apache.hadoop.io.compress.CompressionOutputStream;
import org.apache.hama.bsp.BSPMessageBundle;
-public class Bzip2Compressor<M extends Writable> implements
+public class Bzip2Compressor<M extends Writable> extends
BSPMessageCompressor<M> {
private final BZip2Codec codec = new BZip2Codec();
Modified:
hama/trunk/core/src/main/java/org/apache/hama/bsp/message/compress/SnappyCompressor.java
URL:
http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/compress/SnappyCompressor.java?rev=1390242&r1=1390241&r2=1390242&view=diff
==============================================================================
---
hama/trunk/core/src/main/java/org/apache/hama/bsp/message/compress/SnappyCompressor.java
(original)
+++
hama/trunk/core/src/main/java/org/apache/hama/bsp/message/compress/SnappyCompressor.java
Wed Sep 26 00:48:58 2012
@@ -28,7 +28,7 @@ import org.apache.hama.bsp.BSPMessageBun
import org.xerial.snappy.SnappyInputStream;
import org.xerial.snappy.SnappyOutputStream;
-public class SnappyCompressor<M extends Writable> implements
+public class SnappyCompressor<M extends Writable> extends
BSPMessageCompressor<M> {
@Override
Modified:
hama/trunk/core/src/main/java/org/apache/hama/util/CompressionUtil.java
URL:
http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/util/CompressionUtil.java?rev=1390242&r1=1390241&r2=1390242&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/util/CompressionUtil.java
(original)
+++ hama/trunk/core/src/main/java/org/apache/hama/util/CompressionUtil.java Wed
Sep 26 00:48:58 2012
@@ -50,4 +50,15 @@ public class CompressionUtil {
return (compLen / bos.toByteArray().length);
}
+ public static long getBundleSize(BSPMessageBundle<?> bundle)
+ throws IOException {
+ ByteArrayOutputStream bos = new ByteArrayOutputStream();
+ DataOutputStream dos = new DataOutputStream(bos);
+ bundle.write(dos);
+
+ dos.close();
+ bos.close();
+
+ return bos.toByteArray().length;
+ }
}
Modified:
hama/trunk/core/src/test/java/org/apache/hama/bsp/TestBSPMasterGroomServer.java
URL:
http://svn.apache.org/viewvc/hama/trunk/core/src/test/java/org/apache/hama/bsp/TestBSPMasterGroomServer.java?rev=1390242&r1=1390241&r2=1390242&view=diff
==============================================================================
---
hama/trunk/core/src/test/java/org/apache/hama/bsp/TestBSPMasterGroomServer.java
(original)
+++
hama/trunk/core/src/test/java/org/apache/hama/bsp/TestBSPMasterGroomServer.java
Wed Sep 26 00:48:58 2012
@@ -32,6 +32,7 @@ import org.apache.hama.Constants;
import org.apache.hama.HamaCluster;
import org.apache.hama.HamaConfiguration;
import org.apache.hama.bsp.message.DiskQueue;
+import org.apache.hama.bsp.message.compress.SnappyCompressor;
import org.apache.hama.examples.ClassSerializePrinting;
public class TestBSPMasterGroomServer extends HamaCluster {
@@ -84,6 +85,9 @@ public class TestBSPMasterGroomServer ex
bsp.setOutputKeyClass(IntWritable.class);
bsp.setOutputValueClass(Text.class);
bsp.setOutputPath(OUTPUT_PATH);
+
+ bsp.setCompressionCodec(SnappyCompressor.class);
+ bsp.setCompressionThreshold(40);
BSPJobClient jobClient = new BSPJobClient(configuration);
configuration.setInt(Constants.ZOOKEEPER_SESSION_TIMEOUT, 6000);