Author: dhruba
Date: Fri Jan 16 10:38:24 2009
New Revision: 735082
URL: http://svn.apache.org/viewvc?rev=735082&view=rev
Log:
HADOOP-4918. Fix bzip2 compression to work with Sequence Files.
(Zheng Shao via dhruba).
Added:
hadoop/core/trunk/src/core/org/apache/hadoop/io/compress/bzip2/BZip2DummyCompressor.java
hadoop/core/trunk/src/core/org/apache/hadoop/io/compress/bzip2/BZip2DummyDecompressor.java
Modified:
hadoop/core/trunk/CHANGES.txt
hadoop/core/trunk/src/core/org/apache/hadoop/io/compress/BZip2Codec.java
hadoop/core/trunk/src/core/org/apache/hadoop/io/compress/bzip2/CBZip2OutputStream.java
hadoop/core/trunk/src/test/org/apache/hadoop/io/compress/TestCodec.java
Modified: hadoop/core/trunk/CHANGES.txt
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/CHANGES.txt?rev=735082&r1=735081&r2=735082&view=diff
==============================================================================
--- hadoop/core/trunk/CHANGES.txt (original)
+++ hadoop/core/trunk/CHANGES.txt Fri Jan 16 10:38:24 2009
@@ -643,6 +643,9 @@
HADOOP-4906. Fix TaskTracker OOM by keeping a shallow copy of JobConf in
TaskTracker.TaskInProgress. (Sharad Agarwal via acmurthy)
+ HADOOP-4918. Fix bzip2 compression to work with Sequence Files.
+ (Zheng Shao via dhruba).
+
Release 0.19.0 - 2008-11-18
INCOMPATIBLE CHANGES
Modified:
hadoop/core/trunk/src/core/org/apache/hadoop/io/compress/BZip2Codec.java
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/src/core/org/apache/hadoop/io/compress/BZip2Codec.java?rev=735082&r1=735081&r2=735082&view=diff
==============================================================================
--- hadoop/core/trunk/src/core/org/apache/hadoop/io/compress/BZip2Codec.java
(original)
+++ hadoop/core/trunk/src/core/org/apache/hadoop/io/compress/BZip2Codec.java
Fri Jan 16 10:38:24 2009
@@ -23,6 +23,8 @@
import java.io.InputStream;
import java.io.OutputStream;
+import org.apache.hadoop.io.compress.bzip2.BZip2DummyCompressor;
+import org.apache.hadoop.io.compress.bzip2.BZip2DummyDecompressor;
import org.apache.hadoop.io.compress.bzip2.CBZip2InputStream;
import org.apache.hadoop.io.compress.bzip2.CBZip2OutputStream;
@@ -67,7 +69,7 @@
*/
public CompressionOutputStream createOutputStream(OutputStream out,
Compressor compressor) throws IOException {
- throw new UnsupportedOperationException();
+ return createOutputStream(out);
}
/**
@@ -76,8 +78,8 @@
* @throws java.lang.UnsupportedOperationException
* Throws UnsupportedOperationException
*/
- public Class<org.apache.hadoop.io.compress.Compressor> getCompressorType() {
- throw new UnsupportedOperationException();
+ public Class<? extends org.apache.hadoop.io.compress.Compressor>
getCompressorType() {
+ return BZip2DummyCompressor.class;
}
/**
@@ -87,7 +89,7 @@
* Throws UnsupportedOperationException
*/
public Compressor createCompressor() {
- throw new UnsupportedOperationException();
+ return new BZip2DummyCompressor();
}
/**
@@ -112,8 +114,7 @@
*/
public CompressionInputStream createInputStream(InputStream in,
Decompressor decompressor) throws IOException {
- throw new UnsupportedOperationException();
-
+ return createInputStream(in);
}
/**
@@ -122,8 +123,8 @@
* @throws java.lang.UnsupportedOperationException
* Throws UnsupportedOperationException
*/
- public Class<org.apache.hadoop.io.compress.Decompressor>
getDecompressorType() {
- throw new UnsupportedOperationException();
+ public Class<? extends org.apache.hadoop.io.compress.Decompressor>
getDecompressorType() {
+ return BZip2DummyDecompressor.class;
}
/**
@@ -133,7 +134,7 @@
* Throws UnsupportedOperationException
*/
public Decompressor createDecompressor() {
- throw new UnsupportedOperationException();
+ return new BZip2DummyDecompressor();
}
/**
@@ -149,14 +150,13 @@
// class data starts here//
private CBZip2OutputStream output;
-
+ private boolean needsReset;
// class data ends here//
public BZip2CompressionOutputStream(OutputStream out)
throws IOException {
super(out);
- writeStreamHeader();
- this.output = new CBZip2OutputStream(out);
+ needsReset = true;
}
private void writeStreamHeader() throws IOException {
@@ -168,32 +168,43 @@
}
}
- public void write(byte[] b, int off, int len) throws IOException {
- this.output.write(b, off, len);
-
- }
-
public void finish() throws IOException {
- this.output.flush();
+ this.output.finish();
+ needsReset = true;
}
+ private void internalReset() throws IOException {
+ if (needsReset) {
+ needsReset = false;
+ writeStreamHeader();
+ this.output = new CBZip2OutputStream(out);
+ }
+ }
+
public void resetState() throws IOException {
-
+ // Cannot write to out at this point because out might not be ready
+ // yet, as in SequenceFile.Writer implementation.
+ needsReset = true;
}
public void write(int b) throws IOException {
+ if (needsReset) {
+ internalReset();
+ }
this.output.write(b);
}
+ public void write(byte[] b, int off, int len) throws IOException {
+ if (needsReset) {
+ internalReset();
+ }
+ this.output.write(b, off, len);
+ }
+
public void close() throws IOException {
this.output.flush();
this.output.close();
- }
-
- protected void finalize() throws IOException {
- if (this.output != null) {
- this.close();
- }
+ needsReset = true;
}
}// end of class BZip2CompressionOutputStream
@@ -202,14 +213,13 @@
// class data starts here//
private CBZip2InputStream input;
-
+ boolean needsReset;
// class data ends here//
public BZip2CompressionInputStream(InputStream in) throws IOException {
super(in);
- BufferedInputStream bufferedIn = readStreamHeader();
- input = new CBZip2InputStream(bufferedIn);
+ needsReset = true;
}
private BufferedInputStream readStreamHeader() throws IOException {
@@ -239,29 +249,39 @@
}// end of method
public void close() throws IOException {
- this.input.close();
+ if (!needsReset) {
+ input.close();
+ needsReset = true;
+ }
}
public int read(byte[] b, int off, int len) throws IOException {
-
+ if (needsReset) {
+ internalReset();
+ }
return this.input.read(b, off, len);
}
+ private void internalReset() throws IOException {
+ if (needsReset) {
+ needsReset = false;
+ BufferedInputStream bufferedIn = readStreamHeader();
+ input = new CBZip2InputStream(bufferedIn);
+ }
+ }
+
public void resetState() throws IOException {
-
+ // Cannot read from bufferedIn at this point because bufferedIn might
not be ready
+ // yet, as in SequenceFile.Reader implementation.
+ needsReset = true;
}
public int read() throws IOException {
- return this.input.read();
-
- }
-
- protected void finalize() throws IOException {
- if (this.input != null) {
- this.close();
+ if (needsReset) {
+ internalReset();
}
-
+ return this.input.read();
}
}// end of BZip2CompressionInputStream
Added:
hadoop/core/trunk/src/core/org/apache/hadoop/io/compress/bzip2/BZip2DummyCompressor.java
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/src/core/org/apache/hadoop/io/compress/bzip2/BZip2DummyCompressor.java?rev=735082&view=auto
==============================================================================
---
hadoop/core/trunk/src/core/org/apache/hadoop/io/compress/bzip2/BZip2DummyCompressor.java
(added)
+++
hadoop/core/trunk/src/core/org/apache/hadoop/io/compress/bzip2/BZip2DummyCompressor.java
Fri Jan 16 10:38:24 2009
@@ -0,0 +1,62 @@
+package org.apache.hadoop.io.compress.bzip2;
+
+import java.io.IOException;
+
+import org.apache.hadoop.io.compress.Compressor;
+
+/**
+ * This is a dummy compressor for BZip2.
+ */
+public class BZip2DummyCompressor implements Compressor {
+
+ @Override
+ public int compress(byte[] b, int off, int len) throws IOException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void end() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void finish() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public boolean finished() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public long getBytesRead() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public long getBytesWritten() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public boolean needsInput() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void reset() {
+ // do nothing
+ }
+
+ @Override
+ public void setDictionary(byte[] b, int off, int len) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void setInput(byte[] b, int off, int len) {
+ throw new UnsupportedOperationException();
+ }
+
+}
Added:
hadoop/core/trunk/src/core/org/apache/hadoop/io/compress/bzip2/BZip2DummyDecompressor.java
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/src/core/org/apache/hadoop/io/compress/bzip2/BZip2DummyDecompressor.java?rev=735082&view=auto
==============================================================================
---
hadoop/core/trunk/src/core/org/apache/hadoop/io/compress/bzip2/BZip2DummyDecompressor.java
(added)
+++
hadoop/core/trunk/src/core/org/apache/hadoop/io/compress/bzip2/BZip2DummyDecompressor.java
Fri Jan 16 10:38:24 2009
@@ -0,0 +1,52 @@
+package org.apache.hadoop.io.compress.bzip2;
+
+import java.io.IOException;
+
+import org.apache.hadoop.io.compress.Decompressor;
+
+/**
+ * This is a dummy decompressor for BZip2.
+ */
+public class BZip2DummyDecompressor implements Decompressor {
+
+ @Override
+ public int decompress(byte[] b, int off, int len) throws IOException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void end() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public boolean finished() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public boolean needsDictionary() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public boolean needsInput() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void reset() {
+ // do nothing
+ }
+
+ @Override
+ public void setDictionary(byte[] b, int off, int len) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void setInput(byte[] b, int off, int len) {
+ throw new UnsupportedOperationException();
+ }
+
+}
Modified:
hadoop/core/trunk/src/core/org/apache/hadoop/io/compress/bzip2/CBZip2OutputStream.java
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/src/core/org/apache/hadoop/io/compress/bzip2/CBZip2OutputStream.java?rev=735082&r1=735081&r2=735082&view=diff
==============================================================================
---
hadoop/core/trunk/src/core/org/apache/hadoop/io/compress/bzip2/CBZip2OutputStream.java
(original)
+++
hadoop/core/trunk/src/core/org/apache/hadoop/io/compress/bzip2/CBZip2OutputStream.java
Fri Jan 16 10:38:24 2009
@@ -703,13 +703,13 @@
* Overriden to close the stream.
*/
protected void finalize() throws Throwable {
- close();
+ finish();
super.finalize();
}
- public void close() throws IOException {
- OutputStream outShadow = this.out;
- if (outShadow != null) {
+
+ public void finish() throws IOException {
+ if (out != null) {
try {
if (this.runLength > 0) {
writeRun();
@@ -717,7 +717,6 @@
this.currentChar = -1;
endBlock();
endCompression();
- outShadow.close();
} finally {
this.out = null;
this.data = null;
@@ -725,6 +724,14 @@
}
}
+ public void close() throws IOException {
+ if (out != null) {
+ OutputStream outShadow = this.out;
+ finish();
+ outShadow.close();
+ }
+ }
+
public void flush() throws IOException {
OutputStream outShadow = this.out;
if (outShadow != null) {
Modified:
hadoop/core/trunk/src/test/org/apache/hadoop/io/compress/TestCodec.java
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/io/compress/TestCodec.java?rev=735082&r1=735081&r2=735082&view=diff
==============================================================================
--- hadoop/core/trunk/src/test/org/apache/hadoop/io/compress/TestCodec.java
(original)
+++ hadoop/core/trunk/src/test/org/apache/hadoop/io/compress/TestCodec.java Fri
Jan 16 10:38:24 2009
@@ -29,10 +29,17 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DataInputBuffer;
import org.apache.hadoop.io.DataOutputBuffer;
import org.apache.hadoop.io.RandomDatum;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.hadoop.io.SequenceFile.CompressionType;
import org.apache.hadoop.io.compress.CompressionOutputStream;
public class TestCodec extends TestCase {
@@ -53,7 +60,7 @@
}
public void testBZip2Codec() throws IOException {
- codecTest(conf, seed, count,
"org.apache.hadoop.io.compress.BZip2Codec");
+ codecTest(conf, seed, count, "org.apache.hadoop.io.compress.BZip2Codec");
}
private static void codecTest(Configuration conf, int seed, int count,
@@ -96,8 +103,6 @@
deflateOut.write(data.getData(), 0, data.getLength());
deflateOut.flush();
deflateFilter.finish();
- //Necessary to close the stream for BZip2 Codec to write its final output.
Flush is not enough.
- deflateOut.close();
LOG.info("Finished compressing data");
// De-compress data
@@ -123,6 +128,74 @@
}
LOG.info("SUCCESS! Completed checking " + count + " records");
}
+
+
+ public void testSequenceFileDefaultCodec() throws IOException,
ClassNotFoundException,
+ InstantiationException, IllegalAccessException {
+ sequenceFileCodecTest(conf, 100,
"org.apache.hadoop.io.compress.DefaultCodec", 100);
+ sequenceFileCodecTest(conf, 200000,
"org.apache.hadoop.io.compress.DefaultCodec", 1000000);
+ }
+
+ public void testSequenceFileGzipCodec() throws IOException,
ClassNotFoundException,
+ InstantiationException, IllegalAccessException {
+ sequenceFileCodecTest(conf, 100,
"org.apache.hadoop.io.compress.GzipCodec", 100);
+ sequenceFileCodecTest(conf, 200000,
"org.apache.hadoop.io.compress.GzipCodec", 1000000);
+ }
+
+ public void testSequenceFileBZip2Codec() throws IOException,
ClassNotFoundException,
+ InstantiationException, IllegalAccessException {
+ sequenceFileCodecTest(conf, 100,
"org.apache.hadoop.io.compress.BZip2Codec", 100);
+ sequenceFileCodecTest(conf, 200000,
"org.apache.hadoop.io.compress.BZip2Codec", 1000000);
+ }
+
+ private static void sequenceFileCodecTest(Configuration conf, int lines,
+ String codecClass, int blockSize)
+ throws IOException, ClassNotFoundException, InstantiationException,
IllegalAccessException {
+
+ Path filePath = new Path("SequenceFileCodecTest." + codecClass);
+ // Configuration
+ conf.setInt("io.seqfile.compress.blocksize", blockSize);
+
+ // Create the SequenceFile
+ FileSystem fs = FileSystem.get(conf);
+ LOG.info("Creating SequenceFile with codec \"" + codecClass + "\"");
+ SequenceFile.Writer writer = SequenceFile.createWriter(fs, conf, filePath,
+ Text.class, Text.class, CompressionType.BLOCK,
+ (CompressionCodec)Class.forName(codecClass).newInstance());
+
+ // Write some data
+ LOG.info("Writing to SequenceFile...");
+ for (int i=0; i<lines; i++) {
+ Text key = new Text("key" + i);
+ Text value = new Text("value" + i);
+ writer.append(key, value);
+ }
+ writer.close();
+
+ // Read the data back and check
+ LOG.info("Reading from the SequenceFile...");
+ SequenceFile.Reader reader = new SequenceFile.Reader(fs, filePath, conf);
+
+ Writable key = (Writable)reader.getKeyClass().newInstance();
+ Writable value = (Writable)reader.getValueClass().newInstance();
+
+ int lc = 0;
+ try {
+ while (reader.next(key, value)) {
+ assertEquals("key" + lc, key.toString());
+ assertEquals("value" + lc, value.toString());
+ lc ++;
+ }
+ } finally {
+ reader.close();
+ }
+ assertEquals(lines, lc);
+
+ // Delete temporary files
+ fs.delete(filePath, false);
+
+ LOG.info("SUCCESS! Completed SequenceFileCodecTest with codec \"" +
codecClass + "\"");
+ }
public static void main(String[] args) {
int count = 10000;