Thanks for discussion Taran,

The problem still persists.
What should be done if i have a record which spans multiple PSplits (physcial splits on HDFS)?
What happens if  we try to read beyond a pSplit?
Is the next read transparently done from records corresponding to next block for the same file (might not be on the same machine) or
next block (may not be of the same file) from the local disk is read.

If its former i guess things should have worked fine (surprisingly they arent !! i m goofing it up somewhere). If its latter then i have no idea how to tackle this. (Any help would be highly appreciated)



**************************************************************************************************

I Tried running a simple program where in I created a sample GZip file by serailizing records
          // serialize the objects sarah and sam
FileOutputStream fos = new FileOutputStream("/home/amitsingh/OUTPUT/out.bin");
          GZIPOutputStream gz = new GZIPOutputStream(fos);
          ObjectOutputStream oos = new ObjectOutputStream(gz);

          for (int i = 0; i < 500000; i++) {
              Employee sam = new Employee(i + "name", i,   i + 50000);
           // 3 fields , 2 int , 1 string
              oos.writeObject(sam);
          }
          oos.flush();
          oos.close();

Now if i just run a simple map reduce on this binary file, it gives exception java.io.EOFException: Unexpected end of ZLIB input stream
It creates 2 splits
Split 1: hdfs://localhost:54310/user/amitsingh/out1: start:0 length:1555001 hosts: sandpiper ,bytesRemaining: 1555001 Split 2: hdfs://localhost:54310/user/amitsingh/out1: start1555001 length:1555001 hosts: sandpiper ,

For Map1--> Split1 i get java.io.EOFException: Unexpected end of ZLIB input stream [for startLens[0] start:0 len1556480]
For Map2--> No valid GZip is found as startLens is empty

I am not sure why in Map1 len1556480 and not 3110002(entire file) as there is ONLY one GZip and thats the entire file.
Any guidance would be of great help ??







**************************************************************************************************************
Source code
**************************************************************************************************************

package org.apache.hadoop.mapred;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.zip.GZIPInputStream;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.util.StringUtils;

public class CustomGzipRecordReader implements
       RecordReader<Text, BytesWritable> {

   public static final Log LOG = LogFactory
           .getLog(CustomGzipRecordReader.class);

   protected Configuration conf;
   protected long splitStart = 0;
   protected long pos = 0;
   protected long splitEnd = 0;
   protected long splitLen = 0;
   protected long fileLen = 0;
   protected FSDataInputStream in;
   protected int recordIndex = 0;
   protected long[][] startLens;
   protected byte[] buffer = new byte[4096];

   private static byte[] MAGIC = { (byte) 0x1F, (byte) 0x8B };

//chech the split and populate startLens indicating at which all offset a Zlib file starts in this split
   private void parseArcBytes() throws IOException {

       long totalRead = in.getPos();
       byte[] buffer = new byte[4096];
       List<Long> starts = new ArrayList<Long>();

       int read = -1;
       while ((read = in.read(buffer)) > 0) {

           for (int i = 0; i < (read - 1); i++) {

               if ((buffer[i] == (byte) 0x1F)
                       && (buffer[i + 1] == (byte) 0x8B)) {
                   long curStart = totalRead + i;
                   in.seek(curStart);
                   byte[] zipbytes = null;
                   try {
                       zipbytes = new byte[32];
                       in.read(zipbytes);
ByteArrayInputStream zipin = new ByteArrayInputStream(
                               zipbytes);
                       GZIPInputStream zin = new GZIPInputStream(zipin);
                       zin.close();
                       zipin.close();
                       starts.add(curStart);
                       LOG.info("curStart: " + (curStart));
                   } catch (Exception e) {
                       LOG.info("Ignoring position: " + (curStart));
                       continue;
                   }
               }
           }

           totalRead += read;
           in.seek(totalRead);
           if (totalRead > splitEnd) {
               break;
           }
       }

       startLens = new long[starts.size()][2];
       for (int i = 0; i < starts.size(); i++) {

           long start = starts.get(i);
           long length = ((i < starts.size() - 1) ? starts.get(i + 1)
                   : totalRead)
                   - start;
           startLens[i][0] = start;
           startLens[i][1] = length;
           System.out.println("startLens[" + i + "][0] " + startLens[i][0]
                   + "\t\t" + startLens[i][1]);
       }
   }

   public CustomGzipRecordReader(Configuration conf, FileSplit split)
           throws IOException {

       Path path = split.getPath();
       FileSystem fs = path.getFileSystem(conf);
       fileLen = fs.getFileStatus(split.getPath()).getLen();
       this.conf = conf;
       this.in = fs.open(split.getPath());
       this.splitStart = split.getStart();
       this.splitEnd = splitStart + split.getLength();
       this.splitLen = split.getLength();
       in.seek(splitStart);
       parseArcBytes();

   }

   public void close() throws IOException {
       this.in.close();
   }

   public Text createKey() {
       return (Text) ReflectionUtils.newInstance(Text.class, conf);
   }

   public BytesWritable createValue() {
return (BytesWritable) ReflectionUtils.newInstance(BytesWritable.class,
               conf);
   }

   public long getPos() throws IOException {
       return 0;
   }

   public float getProgress() throws IOException {

       if (recordIndex == 0) {
           return 0.0f;
       } else {
// the progress is current pos - where we started / length of the
           // split
           return Math.min(1.0f, (float) (recordIndex / startLens.length));
       }
   }

   public boolean next(Text key, BytesWritable value) throws IOException {
       long start = 0;
       long len = 0;
       try {
           LOG.info("NEXT Called ");
           int index = recordIndex++;
           if (index >= startLens.length) {
               LOG.info("BAD ");
               return false;
           } else {
               LOG.info("GOOD");
           }

           start = startLens[index][0];
           len = startLens[index][1];
           byte[] zipbytes = new byte[(int) len];

           LOG.info("start:" + start + "\tlen" + len);

           in.seek(start);
           in.read(zipbytes);

           ByteArrayOutputStream baos = new ByteArrayOutputStream();
           ByteArrayInputStream zipin = new ByteArrayInputStream(zipbytes);
           GZIPInputStream zin = new GZIPInputStream(zipin);

           int gzipRead = -1;
           int totalGzipRead = 0;
           baos.reset();
           try {
while ((gzipRead = zin.read(buffer, 0, buffer.length)) != -1) { // <--------SOURCE of exception
                   baos.write(buffer, 0, gzipRead);
                   totalGzipRead += gzipRead;
               }
           } catch (Exception ex) {
               ex.printStackTrace();
               LOG
.info(ex.toString() + "\nstart:" + start + "\tlen" + len);
               LOG.equals(StringUtils.stringifyException(ex));
           }

           byte[] pageBytes = baos.toByteArray();
           baos.close();
           zin.close();
           zipin.close();

// GZIPInputStream gs = new GZIPInputStream(new ByteArrayInputStream(
//                    bytes.get()));

           // ObjectInputStream ois = new ObjectInputStream(zin);
           // for (int i = 0; i < 500000; i++) {
           // Employee sarah = null;
           // try {
           // sarah = (Employee) ois.readObject();
           // // LOG.info(sarah.printObject());
           // } catch (ClassNotFoundException e) {
           // // TODO Auto-generated catch block
           // LOG.info(e + "start:" + start + "\tlen" + len);
           // e.printStackTrace();
           // }
           // // sarah.print();
           //
           // }
Text keyText = (Text) key;
            keyText.set(""+index);
            BytesWritable valueBytes = (BytesWritable) value;
            valueBytes.set(pageBytes, 0, pageBytes.length);

           return true;
       } catch (Exception e) {
           e.printStackTrace();
           LOG.info(e.toString() + "start:" + start + "\tlen" + len);
           LOG.equals(StringUtils.stringifyException(e));
           return false;
       }

   }
}

***********************************************************************************

package org.apache.hadoop.mapred;
import java.io.IOException;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.Text;
/**
* A input format the reads custom gzip files.
*/
public class CustomGzipInputFormat extends FileInputFormat<Text, BytesWritable> { public RecordReader<Text, BytesWritable> getRecordReader(InputSplit split,
           JobConf job, Reporter reporter) throws IOException {
       reporter.setStatus(split.toString());
       return new CustomGzipRecordReader(job, (FileSplit) split);
   }

}

***********************************************************************************


package org.apache.hadoop.examples;

import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.util.ArrayList;
import java.util.List;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.examples.WordCount.Reduce;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.CustomGzipInputFormat;
import org.apache.hadoop.mapred.Employee;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

public class CustomObjectReader extends Configured implements Tool,
       Mapper<Text, BytesWritable, Text, Text> {

   private JobConf jobConf;

// public static final Log LOG = LogFactory.getLog(ArcSegmentCreator.class);

   public CustomObjectReader() {

   }

   public void configure(JobConf job) {
       this.jobConf = job;
   }

   public void close() {
   }

   public CustomObjectReader(Configuration conf) {
       setConf(conf);
   }

   public void map(Text key, BytesWritable bytes,
           OutputCollector<Text, Text> output, Reporter reporter)
           throws IOException {
ObjectInputStream ois = new ObjectInputStream(new ByteArrayInputStream(
               bytes.get()));
       long count =0;
       for (int i = 0; i < 500000; i++) {
           Employee sarah = null;
try { sarah = (Employee) ois.readObject(); } catch (ClassNotFoundException e) {
               System.out.println("EXCEPTOPN" + e);
               e.printStackTrace();
           }
          sarah.print();
       }

   }

   public void readArcs(Path arcFiles, Path outDir) throws IOException {

   }

   public int run(String[] args) throws Exception {

       String usage = "Usage: ArcReader <arcFiles>";

       if (args.length < 2) {
           System.err.println(usage);
           return -1;
       }



       JobConf job = new JobConf(getConf(), CustomObjectReader.class);
       job.setJobName("custom reader");

       job.setInputFormat(CustomGzipInputFormat.class);
       job.setMapperClass(CustomObjectReader.class);
       job.setMapOutputKeyClass(Text.class);
       job.setMapOutputValueClass(Text.class);

       job.setOutputFormat(TextOutputFormat.class);
       job.setOutputKeyClass(Text.class);
       job.setOutputValueClass(Text.class);

       job.setReducerClass(Reduce.class);

       List<String> other_args = new ArrayList<String>();
       for (int i = 0; i < args.length; ++i) {
           try {
               if ("-m".equals(args[i])) {
                   job.setNumMapTasks(Integer.parseInt(args[++i]));
               } else if ("-r".equals(args[i])) {
                   job.setNumReduceTasks(Integer.parseInt(args[++i]));
               } else {
                   other_args.add(args[i]);
               }
           } catch (NumberFormatException except) {
               System.out.println("ERROR: Integer expected instead of "
                       + args[i]);
               return printUsage();
           } catch (ArrayIndexOutOfBoundsException except) {
               System.out.println("ERROR: Required parameter missing from "
                       + args[i - 1]);
               return printUsage();
           }
       }
       // Make sure there are exactly 2 parameters left.
       if (other_args.size() != 2) {
           System.out.println("ERROR: Wrong number of parameters: "
                   + other_args.size() + " instead of 2.");
           return printUsage();
       }
       FileInputFormat.setInputPaths(job, other_args.get(0));
       FileOutputFormat.setOutputPath(job, new Path(other_args.get(1)));

       JobClient.runJob(job);
       return 0;

   }

   static int printUsage() {
       System.out
.println("wordcount [-m <maps>] [-r <reduces>] <input> <output>");
       ToolRunner.printGenericCommandUsage(System.out);
       return -1;
   }

   public static void main(String[] args) throws Exception {
int res = ToolRunner.run(new Configuration(), new CustomObjectReader(),
               args);
       System.exit(res);
   }

}


***********************************************************************************
package org.apache.hadoop.mapred;

import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.io.Serializable;
import java.util.zip.GZIPInputStream;
import java.util.zip.GZIPOutputStream;

public class Employee implements Serializable {
   String name;
   int age;
   int salary;

   public Employee(String name, int age, int salary) {
       this.name = name;
       this.age = age;
       this.salary = salary;
   }

   public void print() {
       System.out.println("Record for: " + name);
       System.out.println("Name: " + name);
       System.out.println("Age: " + age);
       System.out.println("Salary: " + salary);
   }
public String printObject() {
       return "name" +name +"\tage" + age +"\tSalary: " + salary;
   }

   public static void main(String argv[]) throws Exception {
       // create some objects
org.apache.hadoop.mapred.Employee sarah = new Employee("S. Jordan", 28, 56000);


           // serialize the objects sarah and sam
           FileOutputStream fos = new FileOutputStream(
                   "/home/amitsingh/OUTPUT/out.bin");
           GZIPOutputStream gz = new GZIPOutputStream(fos);
           ObjectOutputStream oos = new ObjectOutputStream(gz);

           for (int i = 0; i < 500000; i++) {
org.apache.hadoop.mapred.Employee sam = new Employee(i + "MyNameIsGreat", i,
                       i + 50000);
               oos.writeObject(sam);
           }
           oos.flush();
           oos.close();
           //fos.close();

           // deserialize objects sarah and sam
FileInputStream fis = new FileInputStream("/home/amitsingh/OUTPUT/out.bin");
           GZIPInputStream gs = new GZIPInputStream(fis);
           ObjectInputStream ois = new ObjectInputStream(gs);
           for (int i = 0; i < 10; i++) {
sarah = (org.apache.hadoop.mapred.Employee) ois.readObject();
               sarah.print();

           }
           ois.close();
           //fis.close();
       }
}

Reply via email to