If it is useful to anyone:
here's a codec to support getting data from .tar.gz

Basically the assumption is that instead of having just one text file
gzipped, you have many text files tared and gzipped. Therefore it just
concatenates all the files inside .tar.gz archive.

The source was based on GzipCodec.java 
It also depends on JavaTar from
http://gjt.org/pkgdoc/com/ice/tar/index.html which is released under
Public Domain.

It passes the unit tests for codecs and we've successfully used it in
processing around a hundred gigabytes of data. 


-- 
Andraz Tori, CTO
Zemanta Ltd, New York, London, Ljubljana
www.zemanta.com
mail: [email protected]
tel: +386 41 515 767
twitter: andraz, skype: minmax_test


/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 
Questions about the code: Andraz Tori - [email protected] 
 
 */

package org.apache.hadoop.io.compress;

import java.io.*;
import java.util.zip.GZIPOutputStream;
import java.util.zip.GZIPInputStream;

import org.apache.hadoop.io.compress.DefaultCodec;
import org.apache.hadoop.io.compress.zlib.*;
import com.ice.tar.*;

/**
 * This class creates gzip compressors/decompressors. 
 */
public class TarGzipCodec extends DefaultCodec {
  /**
   * A bridge that wraps around a DeflaterOutputStream to make it 
   * a CompressionOutputStream.
   */
  
  protected static class TarGzipOutputStream extends CompressorStream {
    
    TarGzOutputStream tgzout;

    protected TarGzipOutputStream(CompressorStream out) {
      super(out);
      //System.out.println("tgz cs()");
   } 
    
    public TarGzipOutputStream(OutputStream out) throws IOException {

	super(new TarGzOutputStream(out));
//        System.out.println("tgz os()");
	tgzout = (TarGzOutputStream)this.out;
        tgzout.putNextEntry(new TarEntry("stream.txt"));
    }
    
    public void close() throws IOException {
    //  System.out.println("close()");
      tgzout.closeEntry();
      tgzout.close();
    }
    
    public void flush() throws IOException {
      // Don't flush, since this breaks the TarGzOutputStream 
//      tgzout.flush();
    }
    
    public void write(int b) throws IOException {
//      System.out.println("write1()");
      tgzout.write(b);
    }
    
    public void write(byte[] data, int offset, int length) 
      throws IOException {
       
      /*
      System.out.println("write2()" + Integer.toString(length));
      
      // For testing purposes, split the tar at every 100k, so we get to know if dearchiving is working
      int wo = 0;
      while (wo < length) {
      	
      	tgzout.closeEntry();
      	tgzout.putNextEntry(new TarEntry("stream.txt"));
      	int len = length - wo;
      	if (len > 100000) {
      		len = 100000;
      	}
        tgzout.write(data, offset + wo, len);
        wo += len;
      }*/

      tgzout.write(data, offset, length);
    }
    
    public void finish() throws IOException {
    //  System.out.println("finish()");
      //tgzout.closeEntry();

      tgzout.finish();
    }

    public void resetState() throws IOException {
       System.out.println("resetState() not supported on .tar.gz");
//      throw new IOException("resetState() not supported on .tar.gz output");
    }
  }
  
  protected static class TarGzipInputStream extends DecompressorStream {

    TarInputStream tgzin;
    TarEntry entry;
    
    public TarGzipInputStream(InputStream in) throws IOException {
      super(new TarInputStream(new GZIPInputStream(in)));
        //System.out.println("tgzin os()");
	tgzin = (TarInputStream)this.in;
	nextEntry();
      //System.out.println("GzipInputStream(InputStream in)");
    }
    
    // Go through all files in the tar archive
    private void nextEntry()  throws IOException{
    	entry = tgzin.getNextEntry();
    	//System.out.println("decompressing: " + entry.getName());
    }
    
    /**
     * Allow subclasses to directly set the inflater stream.
     */
    protected TarGzipInputStream(DecompressorStream in) {
      super(in);
      //System.out.println("GzipInputStream(DecompressorStream in)");

    }

    public int available() throws IOException {
      //System.out.println("available()");
      	while (true) {
		
		int av = tgzin.available();
		if (av > 0) {
			return av;
		}
        	nextEntry();
		if (entry == null) {
			return 0;
		}
		
	}
    }

    public void close() throws IOException {
      tgzin.close();
    }

    byte[] oneByte = new byte[1];
    public int read() throws IOException {
      // taken from DecompressorStream.java
      // we route it to our read so we get file-skipping there...
      return (read(oneByte, 0, oneByte.length) == -1) ? -1 : (oneByte[0] & 0xff);
    }
         
    public int read(byte[] data, int offset, int len) throws IOException {
      int retbytes = 0;
      if (entry == null) {
      	return -1;
      }
      while (true) {
      	int thisretbytes = tgzin.read(data, offset + retbytes, len - retbytes);
      	//System.out.println("Read: " + Integer.toString(thisretbytes));
      	if (thisretbytes >= 0) {
      	  retbytes += thisretbytes;
	  if (retbytes == len) {
      		return retbytes;
	  }
      	} else {
		nextEntry();
		if (entry == null) {
			if (retbytes > 0) {
				return retbytes;
			} else {
				return -1;
			}
		}
	}      	
      }
//      System.out.println("read: " + Integer.toString(retbytes) + " -- " +Integer.toString(len));
  //    return retbytes;
    }
    
    public long skip(long offset) throws IOException {
//      System.out.println("skip()"+ Long.toString(offset));
      if (offset == 0) {
      	return 0;
      }
      long skipbytes = 0;
      if (entry == null) {
      	return -1;
      }
      while (true) {
      	long thisskipbytes = tgzin.skip(offset - skipbytes);
      	//System.out.println("Skip: " + Integer.toString(thisskipbytes));
      	if (thisskipbytes > 0) {
      	  skipbytes += thisskipbytes;
	  if (skipbytes == offset) {
      		return skipbytes;
	  }
      	} else {
		nextEntry();
		if (entry == null) {
			return skipbytes;
		}
	}      	
      }

    }
    
    public void resetState() throws IOException {
       System.out.println("resetState() not supported on .tar.gz");
      // throw new IOException("resetState() not supported on .tar.gz input");
    }
    
    
    
  }  
  
  public CompressionOutputStream createOutputStream(OutputStream out) 
    throws IOException {
    return new TarGzipOutputStream(out);
  }
  
  public CompressionOutputStream createOutputStream(OutputStream out, 
                                                    Compressor compressor) 
  throws IOException {
    return (compressor != null) ?
               new CompressorStream(out, compressor,
                                    conf.getInt("io.file.buffer.size", 
                                                4*1024)) :
               createOutputStream(out);                                               

  }

  public Compressor createCompressor() {
    return null;
  }


  public CompressionInputStream createInputStream(InputStream in) 
  throws IOException {
             return new TarGzipInputStream(in);                                         
  }

  public CompressionInputStream createInputStream(InputStream in, 
                                                  Decompressor decompressor) 
  throws IOException {
    return (decompressor != null) ? 
               new DecompressorStream(in, decompressor,
                                      conf.getInt("io.file.buffer.size", 
                                                  4*1024)) :
               createInputStream(in); 
  }

  public Decompressor createDecompressor() {
    return null;                               
  }

  public String getDefaultExtension() {
    return ".tar.gz";
  }

}

Reply via email to