Author: dhruba
Date: Fri Jan 16 11:55:05 2009
New Revision: 735109
URL: http://svn.apache.org/viewvc?rev=735109&view=rev
Log:
HADOOP-4918. Fix bzip2 compression to work with Sequence Files.
(Zheng Shao via dhruba).
Added:
hadoop/core/branches/branch-0.20/src/core/org/apache/hadoop/io/compress/bzip2/BZip2DummyCompressor.java
- copied unchanged from r735082,
hadoop/core/trunk/src/core/org/apache/hadoop/io/compress/bzip2/BZip2DummyCompressor.java
hadoop/core/branches/branch-0.20/src/core/org/apache/hadoop/io/compress/bzip2/BZip2DummyDecompressor.java
- copied unchanged from r735082,
hadoop/core/trunk/src/core/org/apache/hadoop/io/compress/bzip2/BZip2DummyDecompressor.java
Modified:
hadoop/core/branches/branch-0.20/CHANGES.txt (contents, props changed)
hadoop/core/branches/branch-0.20/src/core/org/apache/hadoop/io/compress/BZip2Codec.java
hadoop/core/branches/branch-0.20/src/core/org/apache/hadoop/io/compress/bzip2/CBZip2OutputStream.java
hadoop/core/branches/branch-0.20/src/test/org/apache/hadoop/io/compress/TestCodec.java
Modified: hadoop/core/branches/branch-0.20/CHANGES.txt
URL:
http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.20/CHANGES.txt?rev=735109&r1=735108&r2=735109&view=diff
==============================================================================
--- hadoop/core/branches/branch-0.20/CHANGES.txt (original)
+++ hadoop/core/branches/branch-0.20/CHANGES.txt Fri Jan 16 11:55:05 2009
@@ -590,6 +590,9 @@
HADOOP-4906. Fix TaskTracker OOM by keeping a shallow copy of JobConf in
TaskTracker.TaskInProgress. (Sharad Agarwal via acmurthy)
+ HADOOP-4918. Fix bzip2 compression to work with Sequence Files.
+ (Zheng Shao via dhruba).
+
Release 0.19.0 - 2008-11-18
INCOMPATIBLE CHANGES
Propchange: hadoop/core/branches/branch-0.20/CHANGES.txt
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Fri Jan 16 11:55:05 2009
@@ -1,3 +1,3 @@
/hadoop/core/branches/branch-0.18/CHANGES.txt:727226
/hadoop/core/branches/branch-0.19/CHANGES.txt:713112
-/hadoop/core/trunk/CHANGES.txt:727001,727117,727191,727212,727228,727255,727869,728187,729052,729987,732385,732572,732777,732838,732869,733887,734870,734916
+/hadoop/core/trunk/CHANGES.txt:727001,727117,727191,727212,727228,727255,727869,728187,729052,729987,732385,732572,732777,732838,732869,733887,734870,734916,735082
Modified:
hadoop/core/branches/branch-0.20/src/core/org/apache/hadoop/io/compress/BZip2Codec.java
URL:
http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.20/src/core/org/apache/hadoop/io/compress/BZip2Codec.java?rev=735109&r1=735108&r2=735109&view=diff
==============================================================================
---
hadoop/core/branches/branch-0.20/src/core/org/apache/hadoop/io/compress/BZip2Codec.java
(original)
+++
hadoop/core/branches/branch-0.20/src/core/org/apache/hadoop/io/compress/BZip2Codec.java
Fri Jan 16 11:55:05 2009
@@ -23,6 +23,8 @@
import java.io.InputStream;
import java.io.OutputStream;
+import org.apache.hadoop.io.compress.bzip2.BZip2DummyCompressor;
+import org.apache.hadoop.io.compress.bzip2.BZip2DummyDecompressor;
import org.apache.hadoop.io.compress.bzip2.CBZip2InputStream;
import org.apache.hadoop.io.compress.bzip2.CBZip2OutputStream;
@@ -67,7 +69,7 @@
*/
public CompressionOutputStream createOutputStream(OutputStream out,
Compressor compressor) throws IOException {
- throw new UnsupportedOperationException();
+ return createOutputStream(out);
}
/**
@@ -76,8 +78,8 @@
* @throws java.lang.UnsupportedOperationException
* Throws UnsupportedOperationException
*/
- public Class<org.apache.hadoop.io.compress.Compressor> getCompressorType() {
- throw new UnsupportedOperationException();
+ public Class<? extends org.apache.hadoop.io.compress.Compressor>
getCompressorType() {
+ return BZip2DummyCompressor.class;
}
/**
@@ -87,7 +89,7 @@
* Throws UnsupportedOperationException
*/
public Compressor createCompressor() {
- throw new UnsupportedOperationException();
+ return new BZip2DummyCompressor();
}
/**
@@ -112,8 +114,7 @@
*/
public CompressionInputStream createInputStream(InputStream in,
Decompressor decompressor) throws IOException {
- throw new UnsupportedOperationException();
-
+ return createInputStream(in);
}
/**
@@ -122,8 +123,8 @@
* @throws java.lang.UnsupportedOperationException
* Throws UnsupportedOperationException
*/
- public Class<org.apache.hadoop.io.compress.Decompressor>
getDecompressorType() {
- throw new UnsupportedOperationException();
+ public Class<? extends org.apache.hadoop.io.compress.Decompressor>
getDecompressorType() {
+ return BZip2DummyDecompressor.class;
}
/**
@@ -133,7 +134,7 @@
* Throws UnsupportedOperationException
*/
public Decompressor createDecompressor() {
- throw new UnsupportedOperationException();
+ return new BZip2DummyDecompressor();
}
/**
@@ -149,14 +150,13 @@
// class data starts here//
private CBZip2OutputStream output;
-
+ private boolean needsReset;
// class data ends here//
public BZip2CompressionOutputStream(OutputStream out)
throws IOException {
super(out);
- writeStreamHeader();
- this.output = new CBZip2OutputStream(out);
+ needsReset = true;
}
private void writeStreamHeader() throws IOException {
@@ -168,32 +168,43 @@
}
}
- public void write(byte[] b, int off, int len) throws IOException {
- this.output.write(b, off, len);
-
- }
-
public void finish() throws IOException {
- this.output.flush();
+ this.output.finish();
+ needsReset = true;
}
+ private void internalReset() throws IOException {
+ if (needsReset) {
+ needsReset = false;
+ writeStreamHeader();
+ this.output = new CBZip2OutputStream(out);
+ }
+ }
+
public void resetState() throws IOException {
-
+ // Cannot write to out at this point because out might not be ready
+ // yet, as in SequenceFile.Writer implementation.
+ needsReset = true;
}
public void write(int b) throws IOException {
+ if (needsReset) {
+ internalReset();
+ }
this.output.write(b);
}
+ public void write(byte[] b, int off, int len) throws IOException {
+ if (needsReset) {
+ internalReset();
+ }
+ this.output.write(b, off, len);
+ }
+
public void close() throws IOException {
this.output.flush();
this.output.close();
- }
-
- protected void finalize() throws IOException {
- if (this.output != null) {
- this.close();
- }
+ needsReset = true;
}
}// end of class BZip2CompressionOutputStream
@@ -202,14 +213,13 @@
// class data starts here//
private CBZip2InputStream input;
-
+ boolean needsReset;
// class data ends here//
public BZip2CompressionInputStream(InputStream in) throws IOException {
super(in);
- BufferedInputStream bufferedIn = readStreamHeader();
- input = new CBZip2InputStream(bufferedIn);
+ needsReset = true;
}
private BufferedInputStream readStreamHeader() throws IOException {
@@ -239,29 +249,39 @@
}// end of method
public void close() throws IOException {
- this.input.close();
+ if (!needsReset) {
+ input.close();
+ needsReset = true;
+ }
}
public int read(byte[] b, int off, int len) throws IOException {
-
+ if (needsReset) {
+ internalReset();
+ }
return this.input.read(b, off, len);
}
+ private void internalReset() throws IOException {
+ if (needsReset) {
+ needsReset = false;
+ BufferedInputStream bufferedIn = readStreamHeader();
+ input = new CBZip2InputStream(bufferedIn);
+ }
+ }
+
public void resetState() throws IOException {
-
+ // Cannot read from bufferedIn at this point because bufferedIn might
not be ready
+ // yet, as in SequenceFile.Reader implementation.
+ needsReset = true;
}
public int read() throws IOException {
- return this.input.read();
-
- }
-
- protected void finalize() throws IOException {
- if (this.input != null) {
- this.close();
+ if (needsReset) {
+ internalReset();
}
-
+ return this.input.read();
}
}// end of BZip2CompressionInputStream
Modified:
hadoop/core/branches/branch-0.20/src/core/org/apache/hadoop/io/compress/bzip2/CBZip2OutputStream.java
URL:
http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.20/src/core/org/apache/hadoop/io/compress/bzip2/CBZip2OutputStream.java?rev=735109&r1=735108&r2=735109&view=diff
==============================================================================
---
hadoop/core/branches/branch-0.20/src/core/org/apache/hadoop/io/compress/bzip2/CBZip2OutputStream.java
(original)
+++
hadoop/core/branches/branch-0.20/src/core/org/apache/hadoop/io/compress/bzip2/CBZip2OutputStream.java
Fri Jan 16 11:55:05 2009
@@ -703,13 +703,13 @@
* Overriden to close the stream.
*/
protected void finalize() throws Throwable {
- close();
+ finish();
super.finalize();
}
- public void close() throws IOException {
- OutputStream outShadow = this.out;
- if (outShadow != null) {
+
+ public void finish() throws IOException {
+ if (out != null) {
try {
if (this.runLength > 0) {
writeRun();
@@ -717,7 +717,6 @@
this.currentChar = -1;
endBlock();
endCompression();
- outShadow.close();
} finally {
this.out = null;
this.data = null;
@@ -725,6 +724,14 @@
}
}
+ public void close() throws IOException {
+ if (out != null) {
+ OutputStream outShadow = this.out;
+ finish();
+ outShadow.close();
+ }
+ }
+
public void flush() throws IOException {
OutputStream outShadow = this.out;
if (outShadow != null) {
Modified:
hadoop/core/branches/branch-0.20/src/test/org/apache/hadoop/io/compress/TestCodec.java
URL:
http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.20/src/test/org/apache/hadoop/io/compress/TestCodec.java?rev=735109&r1=735108&r2=735109&view=diff
==============================================================================
---
hadoop/core/branches/branch-0.20/src/test/org/apache/hadoop/io/compress/TestCodec.java
(original)
+++
hadoop/core/branches/branch-0.20/src/test/org/apache/hadoop/io/compress/TestCodec.java
Fri Jan 16 11:55:05 2009
@@ -29,10 +29,17 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DataInputBuffer;
import org.apache.hadoop.io.DataOutputBuffer;
import org.apache.hadoop.io.RandomDatum;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.hadoop.io.SequenceFile.CompressionType;
import org.apache.hadoop.io.compress.CompressionOutputStream;
public class TestCodec extends TestCase {
@@ -53,7 +60,7 @@
}
public void testBZip2Codec() throws IOException {
- codecTest(conf, seed, count,
"org.apache.hadoop.io.compress.BZip2Codec");
+ codecTest(conf, seed, count, "org.apache.hadoop.io.compress.BZip2Codec");
}
private static void codecTest(Configuration conf, int seed, int count,
@@ -96,8 +103,6 @@
deflateOut.write(data.getData(), 0, data.getLength());
deflateOut.flush();
deflateFilter.finish();
- //Necessary to close the stream for BZip2 Codec to write its final output.
Flush is not enough.
- deflateOut.close();
LOG.info("Finished compressing data");
// De-compress data
@@ -123,6 +128,68 @@
}
LOG.info("SUCCESS! Completed checking " + count + " records");
}
+
+
+ public void testSequenceFileDefaultCodec() throws IOException,
ClassNotFoundException,
+ InstantiationException, IllegalAccessException {
+ sequenceFileCodecTest(conf, 100,
"org.apache.hadoop.io.compress.DefaultCodec", 100);
+ sequenceFileCodecTest(conf, 200000,
"org.apache.hadoop.io.compress.DefaultCodec", 1000000);
+ }
+
+ public void testSequenceFileBZip2Codec() throws IOException,
ClassNotFoundException,
+ InstantiationException, IllegalAccessException {
+ sequenceFileCodecTest(conf, 100,
"org.apache.hadoop.io.compress.BZip2Codec", 100);
+ sequenceFileCodecTest(conf, 200000,
"org.apache.hadoop.io.compress.BZip2Codec", 1000000);
+ }
+
+ private static void sequenceFileCodecTest(Configuration conf, int lines,
+ String codecClass, int blockSize)
+ throws IOException, ClassNotFoundException, InstantiationException,
IllegalAccessException {
+
+ Path filePath = new Path("SequenceFileCodecTest." + codecClass);
+ // Configuration
+ conf.setInt("io.seqfile.compress.blocksize", blockSize);
+
+ // Create the SequenceFile
+ FileSystem fs = FileSystem.get(conf);
+ LOG.info("Creating SequenceFile with codec \"" + codecClass + "\"");
+ SequenceFile.Writer writer = SequenceFile.createWriter(fs, conf, filePath,
+ Text.class, Text.class, CompressionType.BLOCK,
+ (CompressionCodec)Class.forName(codecClass).newInstance());
+
+ // Write some data
+ LOG.info("Writing to SequenceFile...");
+ for (int i=0; i<lines; i++) {
+ Text key = new Text("key" + i);
+ Text value = new Text("value" + i);
+ writer.append(key, value);
+ }
+ writer.close();
+
+ // Read the data back and check
+ LOG.info("Reading from the SequenceFile...");
+ SequenceFile.Reader reader = new SequenceFile.Reader(fs, filePath, conf);
+
+ Writable key = (Writable)reader.getKeyClass().newInstance();
+ Writable value = (Writable)reader.getValueClass().newInstance();
+
+ int lc = 0;
+ try {
+ while (reader.next(key, value)) {
+ assertEquals("key" + lc, key.toString());
+ assertEquals("value" + lc, value.toString());
+ lc ++;
+ }
+ } finally {
+ reader.close();
+ }
+ assertEquals(lines, lc);
+
+ // Delete temporary files
+ fs.delete(filePath, false);
+
+ LOG.info("SUCCESS! Completed SequenceFileCodecTest with codec \"" +
codecClass + "\"");
+ }
public static void main(String[] args) {
int count = 10000;