Author: dhruba
Date: Fri Jan 16 10:38:24 2009
New Revision: 735082

URL: http://svn.apache.org/viewvc?rev=735082&view=rev
Log:
HADOOP-4918. Fix bzip2 compression to work with Sequence Files.
(Zheng Shao via dhruba).


Added:
    
hadoop/core/trunk/src/core/org/apache/hadoop/io/compress/bzip2/BZip2DummyCompressor.java
    
hadoop/core/trunk/src/core/org/apache/hadoop/io/compress/bzip2/BZip2DummyDecompressor.java
Modified:
    hadoop/core/trunk/CHANGES.txt
    hadoop/core/trunk/src/core/org/apache/hadoop/io/compress/BZip2Codec.java
    
hadoop/core/trunk/src/core/org/apache/hadoop/io/compress/bzip2/CBZip2OutputStream.java
    hadoop/core/trunk/src/test/org/apache/hadoop/io/compress/TestCodec.java

Modified: hadoop/core/trunk/CHANGES.txt
URL: 
http://svn.apache.org/viewvc/hadoop/core/trunk/CHANGES.txt?rev=735082&r1=735081&r2=735082&view=diff
==============================================================================
--- hadoop/core/trunk/CHANGES.txt (original)
+++ hadoop/core/trunk/CHANGES.txt Fri Jan 16 10:38:24 2009
@@ -643,6 +643,9 @@
     HADOOP-4906. Fix TaskTracker OOM by keeping a shallow copy of JobConf in
     TaskTracker.TaskInProgress. (Sharad Agarwal via acmurthy) 
 
+    HADOOP-4918. Fix bzip2 compression to work with Sequence Files.
+    (Zheng Shao via dhruba).
+
 Release 0.19.0 - 2008-11-18
 
   INCOMPATIBLE CHANGES

Modified: 
hadoop/core/trunk/src/core/org/apache/hadoop/io/compress/BZip2Codec.java
URL: 
http://svn.apache.org/viewvc/hadoop/core/trunk/src/core/org/apache/hadoop/io/compress/BZip2Codec.java?rev=735082&r1=735081&r2=735082&view=diff
==============================================================================
--- hadoop/core/trunk/src/core/org/apache/hadoop/io/compress/BZip2Codec.java 
(original)
+++ hadoop/core/trunk/src/core/org/apache/hadoop/io/compress/BZip2Codec.java 
Fri Jan 16 10:38:24 2009
@@ -23,6 +23,8 @@
 import java.io.InputStream;
 import java.io.OutputStream;
 
+import org.apache.hadoop.io.compress.bzip2.BZip2DummyCompressor;
+import org.apache.hadoop.io.compress.bzip2.BZip2DummyDecompressor;
 import org.apache.hadoop.io.compress.bzip2.CBZip2InputStream;
 import org.apache.hadoop.io.compress.bzip2.CBZip2OutputStream;
 
@@ -67,7 +69,7 @@
    */
   public CompressionOutputStream createOutputStream(OutputStream out,
       Compressor compressor) throws IOException {
-    throw new UnsupportedOperationException();
+    return createOutputStream(out);
   }
 
   /**
@@ -76,8 +78,8 @@
   * @throws java.lang.UnsupportedOperationException
   *             Throws UnsupportedOperationException
   */
-  public Class<org.apache.hadoop.io.compress.Compressor> getCompressorType() {
-    throw new UnsupportedOperationException();
+  public Class<? extends org.apache.hadoop.io.compress.Compressor> 
getCompressorType() {
+    return BZip2DummyCompressor.class;
   }
 
   /**
@@ -87,7 +89,7 @@
   *             Throws UnsupportedOperationException
   */
   public Compressor createCompressor() {
-    throw new UnsupportedOperationException();
+    return new BZip2DummyCompressor();
   }
 
   /**
@@ -112,8 +114,7 @@
   */
   public CompressionInputStream createInputStream(InputStream in,
       Decompressor decompressor) throws IOException {
-    throw new UnsupportedOperationException();
-
+    return createInputStream(in);
   }
 
   /**
@@ -122,8 +123,8 @@
   * @throws java.lang.UnsupportedOperationException
   *             Throws UnsupportedOperationException
   */
-  public Class<org.apache.hadoop.io.compress.Decompressor> 
getDecompressorType() {
-    throw new UnsupportedOperationException();
+  public Class<? extends org.apache.hadoop.io.compress.Decompressor> 
getDecompressorType() {
+    return BZip2DummyDecompressor.class;
   }
 
   /**
@@ -133,7 +134,7 @@
   *             Throws UnsupportedOperationException
   */
   public Decompressor createDecompressor() {
-    throw new UnsupportedOperationException();
+    return new BZip2DummyDecompressor();
   }
 
   /**
@@ -149,14 +150,13 @@
 
     // class data starts here//
     private CBZip2OutputStream output;
-
+    private boolean needsReset; 
     // class data ends here//
 
     public BZip2CompressionOutputStream(OutputStream out)
         throws IOException {
       super(out);
-      writeStreamHeader();
-      this.output = new CBZip2OutputStream(out);
+      needsReset = true;
     }
 
     private void writeStreamHeader() throws IOException {
@@ -168,32 +168,43 @@
       }
     }
 
-    public void write(byte[] b, int off, int len) throws IOException {
-      this.output.write(b, off, len);
-
-    }
-
     public void finish() throws IOException {
-      this.output.flush();
+      this.output.finish();
+      needsReset = true;
     }
 
+    private void internalReset() throws IOException {
+      if (needsReset) {
+        needsReset = false;
+        writeStreamHeader();
+        this.output = new CBZip2OutputStream(out);
+      }
+    }    
+    
     public void resetState() throws IOException {
-
+      // Cannot write to out at this point because out might not be ready
+      // yet, as in SequenceFile.Writer implementation.
+      needsReset = true;
     }
 
     public void write(int b) throws IOException {
+      if (needsReset) {
+        internalReset();
+      }
       this.output.write(b);
     }
 
+    public void write(byte[] b, int off, int len) throws IOException {
+      if (needsReset) {
+        internalReset();
+      }
+      this.output.write(b, off, len);
+    }
+
     public void close() throws IOException {
       this.output.flush();
       this.output.close();
-    }
-
-    protected void finalize() throws IOException {
-      if (this.output != null) {
-        this.close();
-      }
+      needsReset = true;
     }
 
   }// end of class BZip2CompressionOutputStream
@@ -202,14 +213,13 @@
 
     // class data starts here//
     private CBZip2InputStream input;
-
+    boolean needsReset;
     // class data ends here//
 
     public BZip2CompressionInputStream(InputStream in) throws IOException {
 
       super(in);
-      BufferedInputStream bufferedIn = readStreamHeader();
-      input = new CBZip2InputStream(bufferedIn);
+      needsReset = true;
     }
 
     private BufferedInputStream readStreamHeader() throws IOException {
@@ -239,29 +249,39 @@
     }// end of method
 
     public void close() throws IOException {
-      this.input.close();
+      if (!needsReset) {
+        input.close();
+        needsReset = true;
+      }
     }
 
     public int read(byte[] b, int off, int len) throws IOException {
-
+      if (needsReset) {
+        internalReset();
+      }
       return this.input.read(b, off, len);
 
     }
 
+    private void internalReset() throws IOException {
+      if (needsReset) {
+        needsReset = false;
+        BufferedInputStream bufferedIn = readStreamHeader();
+        input = new CBZip2InputStream(bufferedIn);
+      }
+    }    
+    
     public void resetState() throws IOException {
-
+      // Cannot read from bufferedIn at this point because bufferedIn might 
not be ready
+      // yet, as in SequenceFile.Reader implementation.
+      needsReset = true;
     }
 
     public int read() throws IOException {
-      return this.input.read();
-
-    }
-
-    protected void finalize() throws IOException {
-      if (this.input != null) {
-        this.close();
+      if (needsReset) {
+        internalReset();
       }
-
+      return this.input.read();
     }
 
   }// end of BZip2CompressionInputStream

Added: 
hadoop/core/trunk/src/core/org/apache/hadoop/io/compress/bzip2/BZip2DummyCompressor.java
URL: 
http://svn.apache.org/viewvc/hadoop/core/trunk/src/core/org/apache/hadoop/io/compress/bzip2/BZip2DummyCompressor.java?rev=735082&view=auto
==============================================================================
--- 
hadoop/core/trunk/src/core/org/apache/hadoop/io/compress/bzip2/BZip2DummyCompressor.java
 (added)
+++ 
hadoop/core/trunk/src/core/org/apache/hadoop/io/compress/bzip2/BZip2DummyCompressor.java
 Fri Jan 16 10:38:24 2009
@@ -0,0 +1,62 @@
+package org.apache.hadoop.io.compress.bzip2;
+
+import java.io.IOException;
+
+import org.apache.hadoop.io.compress.Compressor;
+
+/**
+ * This is a dummy compressor for BZip2.
+ */
+public class BZip2DummyCompressor implements Compressor {
+
+  @Override
+  public int compress(byte[] b, int off, int len) throws IOException {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public void end() {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public void finish() {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public boolean finished() {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public long getBytesRead() {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public long getBytesWritten() {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public boolean needsInput() {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public void reset() {
+    // do nothing
+  }
+
+  @Override
+  public void setDictionary(byte[] b, int off, int len) {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public void setInput(byte[] b, int off, int len) {
+    throw new UnsupportedOperationException();
+  }
+
+}

Added: 
hadoop/core/trunk/src/core/org/apache/hadoop/io/compress/bzip2/BZip2DummyDecompressor.java
URL: 
http://svn.apache.org/viewvc/hadoop/core/trunk/src/core/org/apache/hadoop/io/compress/bzip2/BZip2DummyDecompressor.java?rev=735082&view=auto
==============================================================================
--- 
hadoop/core/trunk/src/core/org/apache/hadoop/io/compress/bzip2/BZip2DummyDecompressor.java
 (added)
+++ 
hadoop/core/trunk/src/core/org/apache/hadoop/io/compress/bzip2/BZip2DummyDecompressor.java
 Fri Jan 16 10:38:24 2009
@@ -0,0 +1,52 @@
+package org.apache.hadoop.io.compress.bzip2;
+
+import java.io.IOException;
+
+import org.apache.hadoop.io.compress.Decompressor;
+
+/**
+ * This is a dummy decompressor for BZip2.
+ */
+public class BZip2DummyDecompressor implements Decompressor {
+
+  @Override
+  public int decompress(byte[] b, int off, int len) throws IOException {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public void end() {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public boolean finished() {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public boolean needsDictionary() {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public boolean needsInput() {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public void reset() {
+    // do nothing
+  }
+
+  @Override
+  public void setDictionary(byte[] b, int off, int len) {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public void setInput(byte[] b, int off, int len) {
+    throw new UnsupportedOperationException();
+  }
+
+}

Modified: 
hadoop/core/trunk/src/core/org/apache/hadoop/io/compress/bzip2/CBZip2OutputStream.java
URL: 
http://svn.apache.org/viewvc/hadoop/core/trunk/src/core/org/apache/hadoop/io/compress/bzip2/CBZip2OutputStream.java?rev=735082&r1=735081&r2=735082&view=diff
==============================================================================
--- 
hadoop/core/trunk/src/core/org/apache/hadoop/io/compress/bzip2/CBZip2OutputStream.java
 (original)
+++ 
hadoop/core/trunk/src/core/org/apache/hadoop/io/compress/bzip2/CBZip2OutputStream.java
 Fri Jan 16 10:38:24 2009
@@ -703,13 +703,13 @@
   * Overriden to close the stream.
   */
   protected void finalize() throws Throwable {
-    close();
+    finish();
     super.finalize();
   }
 
-  public void close() throws IOException {
-    OutputStream outShadow = this.out;
-    if (outShadow != null) {
+  
+  public void finish() throws IOException {
+    if (out != null) {
       try {
         if (this.runLength > 0) {
           writeRun();
@@ -717,7 +717,6 @@
         this.currentChar = -1;
         endBlock();
         endCompression();
-        outShadow.close();
       } finally {
         this.out = null;
         this.data = null;
@@ -725,6 +724,14 @@
     }
   }
 
+  public void close() throws IOException {
+    if (out != null) {
+      OutputStream outShadow = this.out;
+      finish();
+      outShadow.close();
+    }
+  }
+  
   public void flush() throws IOException {
     OutputStream outShadow = this.out;
     if (outShadow != null) {

Modified: 
hadoop/core/trunk/src/test/org/apache/hadoop/io/compress/TestCodec.java
URL: 
http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/io/compress/TestCodec.java?rev=735082&r1=735081&r2=735082&view=diff
==============================================================================
--- hadoop/core/trunk/src/test/org/apache/hadoop/io/compress/TestCodec.java 
(original)
+++ hadoop/core/trunk/src/test/org/apache/hadoop/io/compress/TestCodec.java Fri 
Jan 16 10:38:24 2009
@@ -29,10 +29,17 @@
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.DataInputBuffer;
 import org.apache.hadoop.io.DataOutputBuffer;
 import org.apache.hadoop.io.RandomDatum;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.hadoop.io.SequenceFile.CompressionType;
 import org.apache.hadoop.io.compress.CompressionOutputStream;
 
 public class TestCodec extends TestCase {
@@ -53,7 +60,7 @@
   }
   
   public void testBZip2Codec() throws IOException {    
-      codecTest(conf, seed, count, 
"org.apache.hadoop.io.compress.BZip2Codec");    
+    codecTest(conf, seed, count, "org.apache.hadoop.io.compress.BZip2Codec");  
  
   }
 
   private static void codecTest(Configuration conf, int seed, int count, 
@@ -96,8 +103,6 @@
     deflateOut.write(data.getData(), 0, data.getLength());
     deflateOut.flush();
     deflateFilter.finish();
-    //Necessary to close the stream for BZip2 Codec to write its final output. 
 Flush is not enough.
-    deflateOut.close();
     LOG.info("Finished compressing data");
     
     // De-compress data
@@ -123,6 +128,74 @@
     }
     LOG.info("SUCCESS! Completed checking " + count + " records");
   }
+
+
+  public void testSequenceFileDefaultCodec() throws IOException, 
ClassNotFoundException, 
+      InstantiationException, IllegalAccessException {
+    sequenceFileCodecTest(conf, 100, 
"org.apache.hadoop.io.compress.DefaultCodec", 100);
+    sequenceFileCodecTest(conf, 200000, 
"org.apache.hadoop.io.compress.DefaultCodec", 1000000);
+  }
+  
+  public void testSequenceFileGzipCodec() throws IOException, 
ClassNotFoundException, 
+      InstantiationException, IllegalAccessException {
+    sequenceFileCodecTest(conf, 100, 
"org.apache.hadoop.io.compress.GzipCodec", 100);
+    sequenceFileCodecTest(conf, 200000, 
"org.apache.hadoop.io.compress.GzipCodec", 1000000);
+  }
+  
+  public void testSequenceFileBZip2Codec() throws IOException, 
ClassNotFoundException, 
+      InstantiationException, IllegalAccessException {
+    sequenceFileCodecTest(conf, 100, 
"org.apache.hadoop.io.compress.BZip2Codec", 100);    
+    sequenceFileCodecTest(conf, 200000, 
"org.apache.hadoop.io.compress.BZip2Codec", 1000000);    
+  }
+  
+  private static void sequenceFileCodecTest(Configuration conf, int lines, 
+                                String codecClass, int blockSize) 
+    throws IOException, ClassNotFoundException, InstantiationException, 
IllegalAccessException {
+
+    Path filePath = new Path("SequenceFileCodecTest." + codecClass);
+    // Configuration
+    conf.setInt("io.seqfile.compress.blocksize", blockSize);
+    
+    // Create the SequenceFile
+    FileSystem fs = FileSystem.get(conf);
+    LOG.info("Creating SequenceFile with codec \"" + codecClass + "\"");
+    SequenceFile.Writer writer = SequenceFile.createWriter(fs, conf, filePath, 
+        Text.class, Text.class, CompressionType.BLOCK, 
+        (CompressionCodec)Class.forName(codecClass).newInstance());
+    
+    // Write some data
+    LOG.info("Writing to SequenceFile...");
+    for (int i=0; i<lines; i++) {
+      Text key = new Text("key" + i);
+      Text value = new Text("value" + i);
+      writer.append(key, value);
+    }
+    writer.close();
+    
+    // Read the data back and check
+    LOG.info("Reading from the SequenceFile...");
+    SequenceFile.Reader reader = new SequenceFile.Reader(fs, filePath, conf);
+    
+    Writable key = (Writable)reader.getKeyClass().newInstance();
+    Writable value = (Writable)reader.getValueClass().newInstance();
+    
+    int lc = 0;
+    try {
+      while (reader.next(key, value)) {
+        assertEquals("key" + lc, key.toString());
+        assertEquals("value" + lc, value.toString());
+        lc ++;
+      }
+    } finally {
+      reader.close();
+    }
+    assertEquals(lines, lc);
+
+    // Delete temporary files
+    fs.delete(filePath, false);
+
+    LOG.info("SUCCESS! Completed SequenceFileCodecTest with codec \"" + 
codecClass + "\"");
+  }
   
   public static void main(String[] args) {
     int count = 10000;


Reply via email to