Author: dhruba
Date: Fri Jan 16 11:50:18 2009
New Revision: 735099
URL: http://svn.apache.org/viewvc?rev=735099&view=rev
Log:
HADOOP-4918. Fix bzip2 compression to work with Sequence Files.
(Zheng Shao via dhruba).
Added:
hadoop/core/branches/branch-0.19/src/core/org/apache/hadoop/io/compress/bzip2/BZip2DummyCompressor.java
- copied unchanged from r735082,
hadoop/core/trunk/src/core/org/apache/hadoop/io/compress/bzip2/BZip2DummyCompressor.java
hadoop/core/branches/branch-0.19/src/core/org/apache/hadoop/io/compress/bzip2/BZip2DummyDecompressor.java
- copied unchanged from r735082,
hadoop/core/trunk/src/core/org/apache/hadoop/io/compress/bzip2/BZip2DummyDecompressor.java
Modified:
hadoop/core/branches/branch-0.19/CHANGES.txt (contents, props changed)
hadoop/core/branches/branch-0.19/src/core/org/apache/hadoop/io/compress/BZip2Codec.java
hadoop/core/branches/branch-0.19/src/core/org/apache/hadoop/io/compress/bzip2/CBZip2OutputStream.java
hadoop/core/branches/branch-0.19/src/test/org/apache/hadoop/io/compress/TestCodec.java
Modified: hadoop/core/branches/branch-0.19/CHANGES.txt
URL:
http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.19/CHANGES.txt?rev=735099&r1=735098&r2=735099&view=diff
==============================================================================
--- hadoop/core/branches/branch-0.19/CHANGES.txt (original)
+++ hadoop/core/branches/branch-0.19/CHANGES.txt Fri Jan 16 11:50:18 2009
@@ -52,6 +52,9 @@
HADOOP-4906. Fix TaskTracker OOM by keeping a shallow copy of JobConf in
TaskTracker.TaskInProgress. (Sharad Agarwal via acmurthy)
+ HADOOP-4918. Fix bzip2 compression to work with Sequence Files.
+ (Zheng Shao via dhruba).
+
Release 0.19.0 - 2008-11-18
INCOMPATIBLE CHANGES
Propchange: hadoop/core/branches/branch-0.19/CHANGES.txt
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Fri Jan 16 11:50:18 2009
@@ -1,2 +1,2 @@
/hadoop/core/branches/branch-0.18/CHANGES.txt:727226
-/hadoop/core/trunk/CHANGES.txt:697306,698176,699056,699098,699415,699424,699444,699490,699517,700163,700628,700923,701273,701398,703923,704203,704261,704701,704703,704707,704712,704732,704748,704989,705391,705420,705430,705762,706350,706707,706719,706796,706802,707258,707262,708623,708641,708710,708723,709040,709303,711717,712881,713888,720602,723013,723460,723831,723918,724883,727117,727212,727217,727228,727869,732572,732777,733887,734870
+/hadoop/core/trunk/CHANGES.txt:697306,698176,699056,699098,699415,699424,699444,699490,699517,700163,700628,700923,701273,701398,703923,704203,704261,704701,704703,704707,704712,704732,704748,704989,705391,705420,705430,705762,706350,706707,706719,706796,706802,707258,707262,708623,708641,708710,708723,709040,709303,711717,712881,713888,720602,723013,723460,723831,723918,724883,727117,727212,727217,727228,727869,732572,732777,733887,734870,735082
Modified:
hadoop/core/branches/branch-0.19/src/core/org/apache/hadoop/io/compress/BZip2Codec.java
URL:
http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.19/src/core/org/apache/hadoop/io/compress/BZip2Codec.java?rev=735099&r1=735098&r2=735099&view=diff
==============================================================================
---
hadoop/core/branches/branch-0.19/src/core/org/apache/hadoop/io/compress/BZip2Codec.java
(original)
+++
hadoop/core/branches/branch-0.19/src/core/org/apache/hadoop/io/compress/BZip2Codec.java
Fri Jan 16 11:50:18 2009
@@ -23,6 +23,8 @@
import java.io.InputStream;
import java.io.OutputStream;
+import org.apache.hadoop.io.compress.bzip2.BZip2DummyCompressor;
+import org.apache.hadoop.io.compress.bzip2.BZip2DummyDecompressor;
import org.apache.hadoop.io.compress.bzip2.CBZip2InputStream;
import org.apache.hadoop.io.compress.bzip2.CBZip2OutputStream;
@@ -67,7 +69,7 @@
*/
public CompressionOutputStream createOutputStream(OutputStream out,
Compressor compressor) throws IOException {
- throw new UnsupportedOperationException();
+ return createOutputStream(out);
}
/**
@@ -76,8 +78,8 @@
* @throws java.lang.UnsupportedOperationException
* Throws UnsupportedOperationException
*/
- public Class<org.apache.hadoop.io.compress.Compressor> getCompressorType() {
- throw new UnsupportedOperationException();
+ public Class<? extends org.apache.hadoop.io.compress.Compressor>
getCompressorType() {
+ return BZip2DummyCompressor.class;
}
/**
@@ -87,7 +89,7 @@
* Throws UnsupportedOperationException
*/
public Compressor createCompressor() {
- throw new UnsupportedOperationException();
+ return new BZip2DummyCompressor();
}
/**
@@ -112,8 +114,7 @@
*/
public CompressionInputStream createInputStream(InputStream in,
Decompressor decompressor) throws IOException {
- throw new UnsupportedOperationException();
-
+ return createInputStream(in);
}
/**
@@ -122,8 +123,8 @@
* @throws java.lang.UnsupportedOperationException
* Throws UnsupportedOperationException
*/
- public Class<org.apache.hadoop.io.compress.Decompressor>
getDecompressorType() {
- throw new UnsupportedOperationException();
+ public Class<? extends org.apache.hadoop.io.compress.Decompressor>
getDecompressorType() {
+ return BZip2DummyDecompressor.class;
}
/**
@@ -133,7 +134,7 @@
* Throws UnsupportedOperationException
*/
public Decompressor createDecompressor() {
- throw new UnsupportedOperationException();
+ return new BZip2DummyDecompressor();
}
/**
@@ -149,14 +150,13 @@
// class data starts here//
private CBZip2OutputStream output;
-
+ private boolean needsReset;
// class data ends here//
public BZip2CompressionOutputStream(OutputStream out)
throws IOException {
super(out);
- writeStreamHeader();
- this.output = new CBZip2OutputStream(out);
+ needsReset = true;
}
private void writeStreamHeader() throws IOException {
@@ -168,32 +168,43 @@
}
}
- public void write(byte[] b, int off, int len) throws IOException {
- this.output.write(b, off, len);
-
- }
-
public void finish() throws IOException {
- this.output.flush();
+ this.output.finish();
+ needsReset = true;
}
+ private void internalReset() throws IOException {
+ if (needsReset) {
+ needsReset = false;
+ writeStreamHeader();
+ this.output = new CBZip2OutputStream(out);
+ }
+ }
+
public void resetState() throws IOException {
-
+ // Cannot write to out at this point because out might not be ready
+ // yet, as in SequenceFile.Writer implementation.
+ needsReset = true;
}
public void write(int b) throws IOException {
+ if (needsReset) {
+ internalReset();
+ }
this.output.write(b);
}
+ public void write(byte[] b, int off, int len) throws IOException {
+ if (needsReset) {
+ internalReset();
+ }
+ this.output.write(b, off, len);
+ }
+
public void close() throws IOException {
this.output.flush();
this.output.close();
- }
-
- protected void finalize() throws IOException {
- if (this.output != null) {
- this.close();
- }
+ needsReset = true;
}
}// end of class BZip2CompressionOutputStream
@@ -202,14 +213,13 @@
// class data starts here//
private CBZip2InputStream input;
-
+ boolean needsReset;
// class data ends here//
public BZip2CompressionInputStream(InputStream in) throws IOException {
super(in);
- BufferedInputStream bufferedIn = readStreamHeader();
- input = new CBZip2InputStream(bufferedIn);
+ needsReset = true;
}
private BufferedInputStream readStreamHeader() throws IOException {
@@ -239,29 +249,39 @@
}// end of method
public void close() throws IOException {
- this.input.close();
+ if (!needsReset) {
+ input.close();
+ needsReset = true;
+ }
}
public int read(byte[] b, int off, int len) throws IOException {
-
+ if (needsReset) {
+ internalReset();
+ }
return this.input.read(b, off, len);
}
+ private void internalReset() throws IOException {
+ if (needsReset) {
+ needsReset = false;
+ BufferedInputStream bufferedIn = readStreamHeader();
+ input = new CBZip2InputStream(bufferedIn);
+ }
+ }
+
public void resetState() throws IOException {
-
+ // Cannot read from bufferedIn at this point because bufferedIn might
not be ready
+ // yet, as in SequenceFile.Reader implementation.
+ needsReset = true;
}
public int read() throws IOException {
- return this.input.read();
-
- }
-
- protected void finalize() throws IOException {
- if (this.input != null) {
- this.close();
+ if (needsReset) {
+ internalReset();
}
-
+ return this.input.read();
}
}// end of BZip2CompressionInputStream
Modified:
hadoop/core/branches/branch-0.19/src/core/org/apache/hadoop/io/compress/bzip2/CBZip2OutputStream.java
URL:
http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.19/src/core/org/apache/hadoop/io/compress/bzip2/CBZip2OutputStream.java?rev=735099&r1=735098&r2=735099&view=diff
==============================================================================
---
hadoop/core/branches/branch-0.19/src/core/org/apache/hadoop/io/compress/bzip2/CBZip2OutputStream.java
(original)
+++
hadoop/core/branches/branch-0.19/src/core/org/apache/hadoop/io/compress/bzip2/CBZip2OutputStream.java
Fri Jan 16 11:50:18 2009
@@ -703,13 +703,13 @@
* Overriden to close the stream.
*/
protected void finalize() throws Throwable {
- close();
+ finish();
super.finalize();
}
- public void close() throws IOException {
- OutputStream outShadow = this.out;
- if (outShadow != null) {
+
+ public void finish() throws IOException {
+ if (out != null) {
try {
if (this.runLength > 0) {
writeRun();
@@ -717,7 +717,6 @@
this.currentChar = -1;
endBlock();
endCompression();
- outShadow.close();
} finally {
this.out = null;
this.data = null;
@@ -725,6 +724,14 @@
}
}
+ public void close() throws IOException {
+ if (out != null) {
+ OutputStream outShadow = this.out;
+ finish();
+ outShadow.close();
+ }
+ }
+
public void flush() throws IOException {
OutputStream outShadow = this.out;
if (outShadow != null) {
Modified:
hadoop/core/branches/branch-0.19/src/test/org/apache/hadoop/io/compress/TestCodec.java
URL:
http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.19/src/test/org/apache/hadoop/io/compress/TestCodec.java?rev=735099&r1=735098&r2=735099&view=diff
==============================================================================
---
hadoop/core/branches/branch-0.19/src/test/org/apache/hadoop/io/compress/TestCodec.java
(original)
+++
hadoop/core/branches/branch-0.19/src/test/org/apache/hadoop/io/compress/TestCodec.java
Fri Jan 16 11:50:18 2009
@@ -29,10 +29,17 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DataInputBuffer;
import org.apache.hadoop.io.DataOutputBuffer;
import org.apache.hadoop.io.RandomDatum;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.hadoop.io.SequenceFile.CompressionType;
import org.apache.hadoop.io.compress.CompressionOutputStream;
public class TestCodec extends TestCase {
@@ -65,7 +72,7 @@
}
public void testBZip2Codec() throws IOException {
- codecTest(conf, seed, count,
"org.apache.hadoop.io.compress.BZip2Codec");
+ codecTest(conf, seed, count, "org.apache.hadoop.io.compress.BZip2Codec");
}
private static void codecTest(Configuration conf, int seed, int count,
@@ -108,8 +115,6 @@
deflateOut.write(data.getData(), 0, data.getLength());
deflateOut.flush();
deflateFilter.finish();
- //Necessary to close the stream for BZip2 Codec to write its final output.
Flush is not enough.
- deflateOut.close();
LOG.info("Finished compressing data");
// De-compress data
@@ -135,6 +140,68 @@
}
LOG.info("SUCCESS! Completed checking " + count + " records");
}
+
+
+ public void testSequenceFileDefaultCodec() throws IOException,
ClassNotFoundException,
+ InstantiationException, IllegalAccessException {
+ sequenceFileCodecTest(conf, 100,
"org.apache.hadoop.io.compress.DefaultCodec", 100);
+ sequenceFileCodecTest(conf, 200000,
"org.apache.hadoop.io.compress.DefaultCodec", 1000000);
+ }
+
+ public void testSequenceFileBZip2Codec() throws IOException,
ClassNotFoundException,
+ InstantiationException, IllegalAccessException {
+ sequenceFileCodecTest(conf, 100,
"org.apache.hadoop.io.compress.BZip2Codec", 100);
+ sequenceFileCodecTest(conf, 200000,
"org.apache.hadoop.io.compress.BZip2Codec", 1000000);
+ }
+
+ private static void sequenceFileCodecTest(Configuration conf, int lines,
+ String codecClass, int blockSize)
+ throws IOException, ClassNotFoundException, InstantiationException,
IllegalAccessException {
+
+ Path filePath = new Path("SequenceFileCodecTest." + codecClass);
+ // Configuration
+ conf.setInt("io.seqfile.compress.blocksize", blockSize);
+
+ // Create the SequenceFile
+ FileSystem fs = FileSystem.get(conf);
+ LOG.info("Creating SequenceFile with codec \"" + codecClass + "\"");
+ SequenceFile.Writer writer = SequenceFile.createWriter(fs, conf, filePath,
+ Text.class, Text.class, CompressionType.BLOCK,
+ (CompressionCodec)Class.forName(codecClass).newInstance());
+
+ // Write some data
+ LOG.info("Writing to SequenceFile...");
+ for (int i=0; i<lines; i++) {
+ Text key = new Text("key" + i);
+ Text value = new Text("value" + i);
+ writer.append(key, value);
+ }
+ writer.close();
+
+ // Read the data back and check
+ LOG.info("Reading from the SequenceFile...");
+ SequenceFile.Reader reader = new SequenceFile.Reader(fs, filePath, conf);
+
+ Writable key = (Writable)reader.getKeyClass().newInstance();
+ Writable value = (Writable)reader.getValueClass().newInstance();
+
+ int lc = 0;
+ try {
+ while (reader.next(key, value)) {
+ assertEquals("key" + lc, key.toString());
+ assertEquals("value" + lc, value.toString());
+ lc ++;
+ }
+ } finally {
+ reader.close();
+ }
+ assertEquals(lines, lc);
+
+ // Delete temporary files
+ fs.delete(filePath, false);
+
+ LOG.info("SUCCESS! Completed SequenceFileCodecTest with codec \"" +
codecClass + "\"");
+ }
public static void main(String[] args) {
int count = 10000;