Author: edwardyoon
Date: Mon Mar 2 23:33:59 2015
New Revision: 1663476
URL: http://svn.apache.org/r1663476
Log:
HAMA-928: Fix compressor bugs
Removed:
hama/trunk/core/src/main/java/org/apache/hama/bsp/message/compress/SnappyCompressor.java
Modified:
hama/trunk/conf/hama-default.xml
hama/trunk/core/pom.xml
hama/trunk/core/src/main/java/org/apache/hama/Constants.java
hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJob.java
hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeerImpl.java
hama/trunk/core/src/main/java/org/apache/hama/bsp/LocalBSPRunner.java
hama/trunk/core/src/main/java/org/apache/hama/bsp/message/HamaAsyncMessageManagerImpl.java
hama/trunk/core/src/main/java/org/apache/hama/bsp/message/HamaMessageManagerImpl.java
hama/trunk/core/src/main/java/org/apache/hama/bsp/message/compress/BSPMessageCompressorFactory.java
hama/trunk/core/src/main/java/org/apache/hama/bsp/message/compress/Bzip2Compressor.java
hama/trunk/core/src/test/java/org/apache/hama/bsp/TestBSPMasterGroomServer.java
hama/trunk/core/src/test/java/org/apache/hama/bsp/message/compress/TestBSPMessageCompressor.java
hama/trunk/examples/src/main/java/org/apache/hama/examples/PiEstimator.java
hama/trunk/examples/src/main/java/org/apache/hama/examples/SemiClusterJobDriver.java
hama/trunk/examples/src/test/java/org/apache/hama/examples/SemiClusterMatchingTest.java
hama/trunk/graph/src/test/java/org/apache/hama/graph/TestSubmitGraphJob.java
hama/trunk/pom.xml
Modified: hama/trunk/conf/hama-default.xml
URL:
http://svn.apache.org/viewvc/hama/trunk/conf/hama-default.xml?rev=1663476&r1=1663475&r2=1663476&view=diff
==============================================================================
--- hama/trunk/conf/hama-default.xml (original)
+++ hama/trunk/conf/hama-default.xml Mon Mar 2 23:33:59 2015
@@ -290,15 +290,9 @@
</property>
<property>
<name>hama.messenger.compression.class</name>
- <value>org.apache.hama.bsp.message.compress.SnappyCompressor</value>
+ <value>org.apache.hama.bsp.message.compress.Bzip2Compressor</value>
<description>The message compression algorithm to choose. Default is
null.</description>
</property>
- <property>
- <name>hama.messenger.compression.threshold</name>
- <value>128</value>
- <description>The Compressor threshold sets the level at which compression
begins.
- The default is 128 bytes.</description>
- </property>
<property>
<name>hama.zookeeper.quorum</name>
Modified: hama/trunk/core/pom.xml
URL:
http://svn.apache.org/viewvc/hama/trunk/core/pom.xml?rev=1663476&r1=1663475&r2=1663476&view=diff
==============================================================================
--- hama/trunk/core/pom.xml (original)
+++ hama/trunk/core/pom.xml Mon Mar 2 23:33:59 2015
@@ -55,6 +55,11 @@
<artifactId>commons-cli</artifactId>
</dependency>
<dependency>
+ <groupId>org.apache.commons</groupId>
+ <artifactId>commons-compress</artifactId>
+ <version>1.9</version>
+ </dependency>
+ <dependency>
<groupId>commons-configuration</groupId>
<artifactId>commons-configuration</artifactId>
</dependency>
Modified: hama/trunk/core/src/main/java/org/apache/hama/Constants.java
URL:
http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/Constants.java?rev=1663476&r1=1663475&r2=1663476&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/Constants.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/Constants.java Mon Mar 2
23:33:59 2015
@@ -164,7 +164,6 @@ public interface Constants {
// Other constants
static final String MESSENGER_RUNTIME_COMPRESSION =
"hama.messenger.runtime.compression";
- static final String MESSENGER_COMPRESSION_THRESHOLD =
"hama.messenger.compression.threshold";
/**
* An empty instance.
Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJob.java
URL:
http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJob.java?rev=1663476&r1=1663475&r2=1663476&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJob.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJob.java Mon Mar 2
23:33:59 2015
@@ -422,15 +422,6 @@ public class BSPJob extends BSPJobContex
conf.setBoolean(Constants.CHECKPOINT_ENABLED, enableCheckPoint);
}
- /**
- * Set compression threshold in bytes.
- *
- * @param ct
- */
- public void setCompressionThreshold(long ct) {
- conf.setLong("hama.messenger.compression.threshold", ct);
- }
-
public void setMessageQueueBehaviour(String queueBehaviour) {
if (queueBehaviour.equals(MessageQueue.PERSISTENT_QUEUE))
conf.setBoolean(MessageQueue.PERSISTENT_QUEUE, true);
Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeerImpl.java
URL:
http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeerImpl.java?rev=1663476&r1=1663475&r2=1663476&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeerImpl.java
(original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeerImpl.java Mon Mar
2 23:33:59 2015
@@ -61,7 +61,7 @@ public final class BSPPeerImpl<K1, V1, K
private static final Log LOG = LogFactory.getLog(BSPPeerImpl.class);
public static enum PeerCounter {
- COMPRESSED_MESSAGES, SUPERSTEP_SUM, TASK_INPUT_RECORDS,
TASK_OUTPUT_RECORDS, IO_BYTES_READ, MESSAGE_BYTES_TRANSFERED,
MESSAGE_BYTES_RECEIVED, TOTAL_MESSAGES_SENT, TOTAL_MESSAGES_RECEIVED,
TOTAL_MESSAGES_COMBINED, COMPRESSED_BYTES_SENT, COMPRESSED_BYTES_RECEIVED,
TIME_IN_SYNC_MS
+ TOTAL_DECOMPRESSED_BYTES, SUPERSTEP_SUM, TASK_INPUT_RECORDS,
TASK_OUTPUT_RECORDS, IO_BYTES_READ, TOTAL_MESSAGE_BYTES_TRANSFERED,
MESSAGE_BYTES_RECEIVED, TOTAL_MESSAGES_SENT, TOTAL_MESSAGES_RECEIVED,
TOTAL_MESSAGES_COMBINED, TIME_IN_SYNC_MS, TOTAL_COMPRESSED_BYTES_TRANSFERED
}
private final HamaConfiguration conf;
Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/LocalBSPRunner.java
URL:
http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/LocalBSPRunner.java?rev=1663476&r1=1663475&r2=1663476&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/LocalBSPRunner.java
(original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/LocalBSPRunner.java Mon
Mar 2 23:33:59 2015
@@ -350,7 +350,7 @@ public class LocalBSPRunner implements J
@Override
public void transfer(InetSocketAddress addr, BSPMessageBundle<M> bundle)
throws IOException {
- peer.incrementCounter(BSPPeerImpl.PeerCounter.MESSAGE_BYTES_TRANSFERED,
+
peer.incrementCounter(BSPPeerImpl.PeerCounter.TOTAL_MESSAGE_BYTES_TRANSFERED,
bundle.getLength());
MANAGER_MAP.get(addr).localQueueForNextIteration.addBundle(bundle);
Modified:
hama/trunk/core/src/main/java/org/apache/hama/bsp/message/HamaAsyncMessageManagerImpl.java
URL:
http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/HamaAsyncMessageManagerImpl.java?rev=1663476&r1=1663475&r2=1663476&view=diff
==============================================================================
---
hama/trunk/core/src/main/java/org/apache/hama/bsp/message/HamaAsyncMessageManagerImpl.java
(original)
+++
hama/trunk/core/src/main/java/org/apache/hama/bsp/message/HamaAsyncMessageManagerImpl.java
Mon Mar 2 23:33:59 2015
@@ -129,11 +129,11 @@ public final class HamaAsyncMessageManag
bundle.write(bufferDos);
byte[] compressed = compressor.compress(byteBuffer.toByteArray());
- peer.incrementCounter(BSPPeerImpl.PeerCounter.MESSAGE_BYTES_TRANSFERED,
+
peer.incrementCounter(BSPPeerImpl.PeerCounter.TOTAL_MESSAGE_BYTES_TRANSFERED,
compressed.length);
bspPeerConnection.put(compressed);
} else {
- peer.incrementCounter(BSPPeerImpl.PeerCounter.MESSAGE_BYTES_TRANSFERED,
+
peer.incrementCounter(BSPPeerImpl.PeerCounter.TOTAL_MESSAGE_BYTES_TRANSFERED,
bundle.getLength());
bspPeerConnection.put(bundle);
}
Modified:
hama/trunk/core/src/main/java/org/apache/hama/bsp/message/HamaMessageManagerImpl.java
URL:
http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/HamaMessageManagerImpl.java?rev=1663476&r1=1663475&r2=1663476&view=diff
==============================================================================
---
hama/trunk/core/src/main/java/org/apache/hama/bsp/message/HamaMessageManagerImpl.java
(original)
+++
hama/trunk/core/src/main/java/org/apache/hama/bsp/message/HamaMessageManagerImpl.java
Mon Mar 2 23:33:59 2015
@@ -123,20 +123,17 @@ public final class HamaMessageManagerImp
throw new IllegalArgumentException("Can not find " + addr.toString()
+ " to transfer messages to!");
} else {
-
System.out.println(conf.getBoolean(Constants.MESSENGER_RUNTIME_COMPRESSION,
false));
-
if (conf.getBoolean(Constants.MESSENGER_RUNTIME_COMPRESSION, false)) {
ByteArrayOutputStream byteBuffer = new ByteArrayOutputStream();
DataOutputStream bufferDos = new DataOutputStream(byteBuffer);
bundle.write(bufferDos);
byte[] compressed = compressor.compress(byteBuffer.toByteArray());
- peer.incrementCounter(BSPPeerImpl.PeerCounter.MESSAGE_BYTES_TRANSFERED,
- compressed.length);
+
peer.incrementCounter(BSPPeerImpl.PeerCounter.TOTAL_COMPRESSED_BYTES_TRANSFERED,
compressed.length);
+
peer.incrementCounter(BSPPeerImpl.PeerCounter.TOTAL_DECOMPRESSED_BYTES,
byteBuffer.size());
bspPeerConnection.put(compressed);
} else {
- peer.incrementCounter(BSPPeerImpl.PeerCounter.MESSAGE_BYTES_TRANSFERED,
- bundle.getLength());
+
peer.incrementCounter(BSPPeerImpl.PeerCounter.TOTAL_MESSAGE_BYTES_TRANSFERED,
bundle.getLength());
bspPeerConnection.put(bundle);
}
}
Modified:
hama/trunk/core/src/main/java/org/apache/hama/bsp/message/compress/BSPMessageCompressorFactory.java
URL:
http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/compress/BSPMessageCompressorFactory.java?rev=1663476&r1=1663475&r2=1663476&view=diff
==============================================================================
---
hama/trunk/core/src/main/java/org/apache/hama/bsp/message/compress/BSPMessageCompressorFactory.java
(original)
+++
hama/trunk/core/src/main/java/org/apache/hama/bsp/message/compress/BSPMessageCompressorFactory.java
Mon Mar 2 23:33:59 2015
@@ -37,7 +37,7 @@ public class BSPMessageCompressorFactory
try {
return (BSPMessageCompressor<M>) ReflectionUtils.newInstance(conf
.getClassByName(conf.get(COMPRESSION_CODEC_CLASS,
- SnappyCompressor.class.getCanonicalName())), conf);
+ Bzip2Compressor.class.getCanonicalName())), conf);
} catch (ClassNotFoundException e) {
e.printStackTrace();
}
Modified:
hama/trunk/core/src/main/java/org/apache/hama/bsp/message/compress/Bzip2Compressor.java
URL:
http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/compress/Bzip2Compressor.java?rev=1663476&r1=1663475&r2=1663476&view=diff
==============================================================================
---
hama/trunk/core/src/main/java/org/apache/hama/bsp/message/compress/Bzip2Compressor.java
(original)
+++
hama/trunk/core/src/main/java/org/apache/hama/bsp/message/compress/Bzip2Compressor.java
Mon Mar 2 23:33:59 2015
@@ -23,42 +23,37 @@ import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
+import org.apache.commons.compress.compressors.CompressorException;
+import org.apache.commons.compress.compressors.CompressorInputStream;
+import org.apache.commons.compress.compressors.CompressorOutputStream;
+import org.apache.commons.compress.compressors.CompressorStreamFactory;
import org.apache.commons.io.IOUtils;
import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.compress.BZip2Codec;
-import org.apache.hadoop.io.compress.CompressionInputStream;
-import org.apache.hadoop.io.compress.CompressionOutputStream;
public class Bzip2Compressor<M extends Writable> extends
BSPMessageCompressor<M> {
- private final BZip2Codec codec = new BZip2Codec();
-
@Override
public byte[] compress(byte[] bytes) {
- ByteArrayOutputStream bos = null;
- CompressionOutputStream sos = null;
- DataOutputStream dos = null;
- byte[] compressedBytes = null;
+ ByteArrayInputStream bis = new ByteArrayInputStream(bytes);
+ DataInputStream in = new DataInputStream(bis);
+
+ ByteArrayOutputStream outBuffer = new ByteArrayOutputStream();
+ DataOutputStream out = new DataOutputStream(outBuffer);
+ CompressorOutputStream cos = null;
try {
- bos = new ByteArrayOutputStream();
- sos = codec.createOutputStream(bos);
- dos = new DataOutputStream(sos);
- dos.close(); // Flush the stream as no more data will be sent.
-
- compressedBytes = bos.toByteArray();
- } catch (IOException ioe) {
- LOG.error("Unable to compress", ioe);
- } finally {
- try {
- sos.close();
- bos.close();
- } catch (IOException e) {
- LOG.warn("Failed to close compression streams.", e);
- }
+ cos = new CompressorStreamFactory().createCompressorOutputStream("bzip2",
+ out);
+ IOUtils.copy(in, cos);
+ cos.close();
+ } catch (CompressorException e) {
+ e.printStackTrace();
+ } catch (IOException e) {
+ e.printStackTrace();
}
- return compressedBytes;
+
+ return outBuffer.toByteArray();
}
/**
@@ -70,29 +65,23 @@ public class Bzip2Compressor<M extends W
*/
@Override
public byte[] decompress(byte[] compressedBytes) {
- ByteArrayInputStream bis = null;
- CompressionInputStream sis = null;
- DataInputStream dis = null;
- byte[] bytes = null;
+ ByteArrayInputStream bis = new ByteArrayInputStream(compressedBytes);
+ DataInputStream in = new DataInputStream(bis);
+ ByteArrayOutputStream outBuffer = new ByteArrayOutputStream();
+ DataOutputStream out = new DataOutputStream(outBuffer);
try {
- bis = new ByteArrayInputStream(compressedBytes);
- sis = codec.createInputStream(bis);
- dis = new DataInputStream(sis);
- bytes = IOUtils.toByteArray(dis);
- } catch (IOException ioe) {
- LOG.error("Unable to decompress.", ioe);
- } finally {
- try {
- dis.close();
- sis.close();
- bis.close();
- } catch (IOException e) {
- LOG.warn("Failed to close decompression streams.", e);
- }
- }
- return bytes;
+ final CompressorInputStream cin = new CompressorStreamFactory()
+ .createCompressorInputStream("bzip2", in);
+ IOUtils.copy(cin, out);
+ in.close();
+ } catch (CompressorException e) {
+ e.printStackTrace();
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ return outBuffer.toByteArray();
}
}
Modified:
hama/trunk/core/src/test/java/org/apache/hama/bsp/TestBSPMasterGroomServer.java
URL:
http://svn.apache.org/viewvc/hama/trunk/core/src/test/java/org/apache/hama/bsp/TestBSPMasterGroomServer.java?rev=1663476&r1=1663475&r2=1663476&view=diff
==============================================================================
---
hama/trunk/core/src/test/java/org/apache/hama/bsp/TestBSPMasterGroomServer.java
(original)
+++
hama/trunk/core/src/test/java/org/apache/hama/bsp/TestBSPMasterGroomServer.java
Mon Mar 2 23:33:59 2015
@@ -31,7 +31,7 @@ import org.apache.hadoop.io.Text;
import org.apache.hama.Constants;
import org.apache.hama.HamaCluster;
import org.apache.hama.HamaConfiguration;
-import org.apache.hama.bsp.message.compress.SnappyCompressor;
+import org.apache.hama.bsp.message.compress.Bzip2Compressor;
import org.apache.hama.examples.ClassSerializePrinting;
public class TestBSPMasterGroomServer extends HamaCluster {
@@ -85,8 +85,7 @@ public class TestBSPMasterGroomServer ex
bsp.setOutputValueClass(Text.class);
bsp.setOutputPath(OUTPUT_PATH);
- bsp.setCompressionCodec(SnappyCompressor.class);
- bsp.setCompressionThreshold(40);
+ bsp.setCompressionCodec(Bzip2Compressor.class);
BSPJobClient jobClient = new BSPJobClient(configuration);
configuration.setInt(Constants.ZOOKEEPER_SESSION_TIMEOUT, 6000);
Modified:
hama/trunk/core/src/test/java/org/apache/hama/bsp/message/compress/TestBSPMessageCompressor.java
URL:
http://svn.apache.org/viewvc/hama/trunk/core/src/test/java/org/apache/hama/bsp/message/compress/TestBSPMessageCompressor.java?rev=1663476&r1=1663475&r2=1663476&view=diff
==============================================================================
---
hama/trunk/core/src/test/java/org/apache/hama/bsp/message/compress/TestBSPMessageCompressor.java
(original)
+++
hama/trunk/core/src/test/java/org/apache/hama/bsp/message/compress/TestBSPMessageCompressor.java
Mon Mar 2 23:33:59 2015
@@ -22,11 +22,13 @@ import java.io.ByteArrayOutputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
+import java.util.Iterator;
import junit.framework.TestCase;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.IntWritable;
+import org.apache.hama.bsp.BSPMessageBundle;
public class TestBSPMessageCompressor extends TestCase {
@@ -37,33 +39,35 @@ public class TestBSPMessageCompressor ex
assertNull(compressor);
configuration.setClass(BSPMessageCompressorFactory.COMPRESSION_CODEC_CLASS,
- SnappyCompressor.class, BSPMessageCompressor.class);
+ Bzip2Compressor.class, BSPMessageCompressor.class);
compressor = new BSPMessageCompressorFactory<IntWritable>()
.getCompressor(configuration);
assertNotNull(compressor);
- IntWritable a = new IntWritable(123);
- IntWritable b = new IntWritable(321);
- ByteArrayOutputStream bos = new ByteArrayOutputStream();
- DataOutputStream dos = new DataOutputStream(bos);
- a.write(dos);
- b.write(dos);
+ BSPMessageBundle<IntWritable> a = new BSPMessageBundle<IntWritable>();
+ for (int i = 0; i < 10000; i++) {
+ a.addMessage(new IntWritable(i));
+ }
+
+ ByteArrayOutputStream byteBuffer = new ByteArrayOutputStream();
+ DataOutputStream bufferDos = new DataOutputStream(byteBuffer);
+ a.write(bufferDos);
- byte[] x = bos.toByteArray();
-
- byte[] compressed = compressor.compress(x);
+ byte[] compressed = compressor.compress(byteBuffer.toByteArray());
+ assertTrue(byteBuffer.size() > compressed.length);
byte[] decompressed = compressor.decompress(compressed);
ByteArrayInputStream bis = new ByteArrayInputStream(decompressed);
- DataInputStream dis = new DataInputStream(bis);
+ DataInputStream in = new DataInputStream(bis);
- IntWritable c = new IntWritable();
- c.readFields(dis);
- assertEquals(123, c.get());
-
- IntWritable d = new IntWritable();
- d.readFields(dis);
- assertEquals(321, d.get());
+ BSPMessageBundle<IntWritable> b = new BSPMessageBundle<IntWritable>();
+ b.readFields(in);
+ Iterator<IntWritable> it = b.iterator();
+ int counter = 0;
+ while (it.hasNext()) {
+ assertTrue(it.next().get() == counter);
+ counter++;
+ }
}
}
Modified:
hama/trunk/examples/src/main/java/org/apache/hama/examples/PiEstimator.java
URL:
http://svn.apache.org/viewvc/hama/trunk/examples/src/main/java/org/apache/hama/examples/PiEstimator.java?rev=1663476&r1=1663475&r2=1663476&view=diff
==============================================================================
--- hama/trunk/examples/src/main/java/org/apache/hama/examples/PiEstimator.java
(original)
+++ hama/trunk/examples/src/main/java/org/apache/hama/examples/PiEstimator.java
Mon Mar 2 23:33:59 2015
@@ -39,7 +39,6 @@ import org.apache.hama.bsp.ClusterStatus
import org.apache.hama.bsp.FileOutputFormat;
import org.apache.hama.bsp.NullInputFormat;
import org.apache.hama.bsp.TextOutputFormat;
-import org.apache.hama.bsp.message.compress.SnappyCompressor;
import org.apache.hama.bsp.sync.SyncException;
public class PiEstimator {
@@ -117,8 +116,6 @@ public class PiEstimator {
HamaConfiguration conf = new HamaConfiguration();
BSPJob bsp = new BSPJob(conf, PiEstimator.class);
- bsp.setCompressionCodec(SnappyCompressor.class);
- bsp.setCompressionThreshold(40);
// Set the job name
bsp.setJobName("Pi Estimation Example");
Modified:
hama/trunk/examples/src/main/java/org/apache/hama/examples/SemiClusterJobDriver.java
URL:
http://svn.apache.org/viewvc/hama/trunk/examples/src/main/java/org/apache/hama/examples/SemiClusterJobDriver.java?rev=1663476&r1=1663475&r2=1663476&view=diff
==============================================================================
---
hama/trunk/examples/src/main/java/org/apache/hama/examples/SemiClusterJobDriver.java
(original)
+++
hama/trunk/examples/src/main/java/org/apache/hama/examples/SemiClusterJobDriver.java
Mon Mar 2 23:33:59 2015
@@ -30,7 +30,7 @@ import org.apache.hama.HamaConfiguration
import org.apache.hama.bsp.HashPartitioner;
import org.apache.hama.bsp.TextInputFormat;
import org.apache.hama.bsp.TextOutputFormat;
-import org.apache.hama.bsp.message.compress.SnappyCompressor;
+import org.apache.hama.bsp.message.compress.Bzip2Compressor;
import org.apache.hama.graph.GraphJob;
import org.apache.hama.ml.semiclustering.SemiClusterMessage;
import org.apache.hama.ml.semiclustering.SemiClusterTextReader;
@@ -52,8 +52,7 @@ public class SemiClusterJobDriver {
InterruptedException, ClassNotFoundException {
GraphJob semiClusterJob = new GraphJob(conf, SemiClusterJobDriver.class);
- semiClusterJob.setCompressionCodec(SnappyCompressor.class);
- semiClusterJob.setCompressionThreshold(10);
+ semiClusterJob.setCompressionCodec(Bzip2Compressor.class);
semiClusterJob
.setVertexOutputWriterClass(SemiClusterVertexOutputWriter.class);
Modified:
hama/trunk/examples/src/test/java/org/apache/hama/examples/SemiClusterMatchingTest.java
URL:
http://svn.apache.org/viewvc/hama/trunk/examples/src/test/java/org/apache/hama/examples/SemiClusterMatchingTest.java?rev=1663476&r1=1663475&r2=1663476&view=diff
==============================================================================
---
hama/trunk/examples/src/test/java/org/apache/hama/examples/SemiClusterMatchingTest.java
(original)
+++
hama/trunk/examples/src/test/java/org/apache/hama/examples/SemiClusterMatchingTest.java
Mon Mar 2 23:33:59 2015
@@ -19,9 +19,7 @@
package org.apache.hama.examples;
import java.io.BufferedReader;
-import java.io.BufferedWriter;
import java.io.FileReader;
-import java.io.FileWriter;
import java.io.IOException;
import java.io.InputStreamReader;
import java.util.ArrayList;
@@ -45,7 +43,7 @@ import org.apache.hama.HamaConfiguration
import org.apache.hama.bsp.HashPartitioner;
import org.apache.hama.bsp.TextInputFormat;
import org.apache.hama.bsp.TextOutputFormat;
-import org.apache.hama.bsp.message.compress.SnappyCompressor;
+import org.apache.hama.bsp.message.compress.Bzip2Compressor;
import org.apache.hama.graph.GraphJob;
import org.apache.hama.ml.semiclustering.SemiClusterMessage;
import org.apache.hama.ml.semiclustering.SemiClusterTextReader;
@@ -53,7 +51,6 @@ import org.apache.hama.ml.semiclustering
import org.apache.hama.ml.semiclustering.SemiClusteringVertex;
import org.junit.Test;
-@SuppressWarnings("unused")
public class SemiClusterMatchingTest extends TestCase {
private static String INPUT = "src/test/resources/semiclustering.txt";
private static String OUTPUT = "/tmp/graph-semiCluster";
@@ -196,8 +193,7 @@ public class SemiClusterMatchingTest ext
GraphJob semiClusterJob = new GraphJob(conf, SemiClusterJobDriver.class);
semiClusterJob.setMaxIteration(15);
- semiClusterJob.setCompressionCodec(SnappyCompressor.class);
- semiClusterJob.setCompressionThreshold(10);
+ semiClusterJob.setCompressionCodec(Bzip2Compressor.class);
semiClusterJob
.setVertexOutputWriterClass(SemiClusterVertexOutputWriter.class);
Modified:
hama/trunk/graph/src/test/java/org/apache/hama/graph/TestSubmitGraphJob.java
URL:
http://svn.apache.org/viewvc/hama/trunk/graph/src/test/java/org/apache/hama/graph/TestSubmitGraphJob.java?rev=1663476&r1=1663475&r2=1663476&view=diff
==============================================================================
---
hama/trunk/graph/src/test/java/org/apache/hama/graph/TestSubmitGraphJob.java
(original)
+++
hama/trunk/graph/src/test/java/org/apache/hama/graph/TestSubmitGraphJob.java
Mon Mar 2 23:33:59 2015
@@ -36,7 +36,7 @@ import org.apache.hama.bsp.HashPartition
import org.apache.hama.bsp.SequenceFileInputFormat;
import org.apache.hama.bsp.SequenceFileOutputFormat;
import org.apache.hama.bsp.TestBSPMasterGroomServer;
-import org.apache.hama.bsp.message.compress.SnappyCompressor;
+import org.apache.hama.bsp.message.compress.Bzip2Compressor;
import org.apache.hama.commons.io.TextArrayWritable;
import org.apache.hama.graph.example.PageRank;
import org.apache.hama.graph.example.PageRank.PagerankSeqReader;
@@ -85,7 +85,7 @@ public class TestSubmitGraphJob extends
// set the defaults
bsp.setMaxIteration(30);
- bsp.setCompressionCodec(SnappyCompressor.class);
+ bsp.setCompressionCodec(Bzip2Compressor.class);
bsp.setAggregatorClass(AverageAggregator.class);
bsp.setInputFormat(SequenceFileInputFormat.class);
Modified: hama/trunk/pom.xml
URL:
http://svn.apache.org/viewvc/hama/trunk/pom.xml?rev=1663476&r1=1663475&r2=1663476&view=diff
==============================================================================
--- hama/trunk/pom.xml (original)
+++ hama/trunk/pom.xml Mon Mar 2 23:33:59 2015
@@ -92,6 +92,7 @@
<commons-lang.version>2.6</commons-lang.version>
<commons-httpclient.version>3.0.1</commons-httpclient.version>
<commons-io.version>2.4</commons-io.version>
+ <commons-compress.version>1.9</commons-compress.version>
<hadoop.version>1.2.0</hadoop.version>
<protobuf.version>2.5.0</protobuf.version>
<jetty.version>6.1.14</jetty.version>
@@ -219,6 +220,11 @@
<version>${commons-cli.version}</version>
</dependency>
<dependency>
+ <groupId>org.apache.commons</groupId>
+ <artifactId>commons-compress</artifactId>
+ <version>${commons-compress.version}</version>
+ </dependency>
+ <dependency>
<groupId>commons-configuration</groupId>
<artifactId>commons-configuration</artifactId>
<version>${commons-configuration.version}</version>