Thanks for discussion Taran,
The problem still persists.
What should be done if i have a record which spans multiple PSplits
(physcial splits on HDFS)?
What happens if we try to read beyond a pSplit?
Is the next read transparently done from records corresponding to next
block for the same file (might not be on the same machine) or
next block (may not be of the same file) from the local disk is read.
If its former i guess things should have worked fine (surprisingly they
arent !! i m goofing it up somewhere).
If its latter then i have no idea how to tackle this. (Any help would be
highly appreciated)
**************************************************************************************************
I Tried running a simple program where in I created a sample GZip file
by serailizing records
// serialize the objects sarah and sam
FileOutputStream fos = new
FileOutputStream("/home/amitsingh/OUTPUT/out.bin");
GZIPOutputStream gz = new GZIPOutputStream(fos);
ObjectOutputStream oos = new ObjectOutputStream(gz);
for (int i = 0; i < 500000; i++) {
Employee sam = new Employee(i + "name", i, i + 50000);
// 3 fields , 2 int , 1 string
oos.writeObject(sam);
}
oos.flush();
oos.close();
Now if i just run a simple map reduce on this binary file, it gives
exception java.io.EOFException: Unexpected end of ZLIB input stream
It creates 2 splits
Split 1: hdfs://localhost:54310/user/amitsingh/out1: start:0
length:1555001 hosts: sandpiper ,bytesRemaining: 1555001
Split 2: hdfs://localhost:54310/user/amitsingh/out1: start1555001
length:1555001 hosts: sandpiper ,
For Map1--> Split1 i get java.io.EOFException: Unexpected end of ZLIB
input stream [for startLens[0] start:0 len1556480]
For Map2--> No valid GZip is found as startLens is empty
I am not sure why in Map1 len1556480 and not 3110002(entire file) as
there is ONLY one GZip and thats the entire file.
Any guidance would be of great help ??
**************************************************************************************************************
Source code
**************************************************************************************************************
package org.apache.hadoop.mapred;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.zip.GZIPInputStream;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.util.StringUtils;
public class CustomGzipRecordReader implements
RecordReader<Text, BytesWritable> {
public static final Log LOG = LogFactory
.getLog(CustomGzipRecordReader.class);
protected Configuration conf;
protected long splitStart = 0;
protected long pos = 0;
protected long splitEnd = 0;
protected long splitLen = 0;
protected long fileLen = 0;
protected FSDataInputStream in;
protected int recordIndex = 0;
protected long[][] startLens;
protected byte[] buffer = new byte[4096];
private static byte[] MAGIC = { (byte) 0x1F, (byte) 0x8B };
//chech the split and populate startLens indicating at which all
offset a Zlib file starts in this split
private void parseArcBytes() throws IOException {
long totalRead = in.getPos();
byte[] buffer = new byte[4096];
List<Long> starts = new ArrayList<Long>();
int read = -1;
while ((read = in.read(buffer)) > 0) {
for (int i = 0; i < (read - 1); i++) {
if ((buffer[i] == (byte) 0x1F)
&& (buffer[i + 1] == (byte) 0x8B)) {
long curStart = totalRead + i;
in.seek(curStart);
byte[] zipbytes = null;
try {
zipbytes = new byte[32];
in.read(zipbytes);
ByteArrayInputStream zipin = new
ByteArrayInputStream(
zipbytes);
GZIPInputStream zin = new GZIPInputStream(zipin);
zin.close();
zipin.close();
starts.add(curStart);
LOG.info("curStart: " + (curStart));
} catch (Exception e) {
LOG.info("Ignoring position: " + (curStart));
continue;
}
}
}
totalRead += read;
in.seek(totalRead);
if (totalRead > splitEnd) {
break;
}
}
startLens = new long[starts.size()][2];
for (int i = 0; i < starts.size(); i++) {
long start = starts.get(i);
long length = ((i < starts.size() - 1) ? starts.get(i + 1)
: totalRead)
- start;
startLens[i][0] = start;
startLens[i][1] = length;
System.out.println("startLens[" + i + "][0] " + startLens[i][0]
+ "\t\t" + startLens[i][1]);
}
}
public CustomGzipRecordReader(Configuration conf, FileSplit split)
throws IOException {
Path path = split.getPath();
FileSystem fs = path.getFileSystem(conf);
fileLen = fs.getFileStatus(split.getPath()).getLen();
this.conf = conf;
this.in = fs.open(split.getPath());
this.splitStart = split.getStart();
this.splitEnd = splitStart + split.getLength();
this.splitLen = split.getLength();
in.seek(splitStart);
parseArcBytes();
}
public void close() throws IOException {
this.in.close();
}
public Text createKey() {
return (Text) ReflectionUtils.newInstance(Text.class, conf);
}
public BytesWritable createValue() {
return (BytesWritable)
ReflectionUtils.newInstance(BytesWritable.class,
conf);
}
public long getPos() throws IOException {
return 0;
}
public float getProgress() throws IOException {
if (recordIndex == 0) {
return 0.0f;
} else {
// the progress is current pos - where we started / length
of the
// split
return Math.min(1.0f, (float) (recordIndex / startLens.length));
}
}
public boolean next(Text key, BytesWritable value) throws IOException {
long start = 0;
long len = 0;
try {
LOG.info("NEXT Called ");
int index = recordIndex++;
if (index >= startLens.length) {
LOG.info("BAD ");
return false;
} else {
LOG.info("GOOD");
}
start = startLens[index][0];
len = startLens[index][1];
byte[] zipbytes = new byte[(int) len];
LOG.info("start:" + start + "\tlen" + len);
in.seek(start);
in.read(zipbytes);
ByteArrayOutputStream baos = new ByteArrayOutputStream();
ByteArrayInputStream zipin = new ByteArrayInputStream(zipbytes);
GZIPInputStream zin = new GZIPInputStream(zipin);
int gzipRead = -1;
int totalGzipRead = 0;
baos.reset();
try {
while ((gzipRead = zin.read(buffer, 0, buffer.length))
!= -1) { // <--------SOURCE of exception
baos.write(buffer, 0, gzipRead);
totalGzipRead += gzipRead;
}
} catch (Exception ex) {
ex.printStackTrace();
LOG
.info(ex.toString() + "\nstart:" + start +
"\tlen"
+ len);
LOG.equals(StringUtils.stringifyException(ex));
}
byte[] pageBytes = baos.toByteArray();
baos.close();
zin.close();
zipin.close();
// GZIPInputStream gs = new GZIPInputStream(new
ByteArrayInputStream(
// bytes.get()));
// ObjectInputStream ois = new ObjectInputStream(zin);
// for (int i = 0; i < 500000; i++) {
// Employee sarah = null;
// try {
// sarah = (Employee) ois.readObject();
// // LOG.info(sarah.printObject());
// } catch (ClassNotFoundException e) {
// // TODO Auto-generated catch block
// LOG.info(e + "start:" + start + "\tlen" + len);
// e.printStackTrace();
// }
// // sarah.print();
//
// }
Text keyText = (Text) key;
keyText.set(""+index);
BytesWritable valueBytes = (BytesWritable) value;
valueBytes.set(pageBytes, 0, pageBytes.length);
return true;
} catch (Exception e) {
e.printStackTrace();
LOG.info(e.toString() + "start:" + start + "\tlen" + len);
LOG.equals(StringUtils.stringifyException(e));
return false;
}
}
}
***********************************************************************************
package org.apache.hadoop.mapred;
import java.io.IOException;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.Text;
/**
* A input format the reads custom gzip files.
*/
public class CustomGzipInputFormat extends FileInputFormat<Text,
BytesWritable> {
public RecordReader<Text, BytesWritable> getRecordReader(InputSplit
split,
JobConf job, Reporter reporter) throws IOException {
reporter.setStatus(split.toString());
return new CustomGzipRecordReader(job, (FileSplit) split);
}
}
***********************************************************************************
package org.apache.hadoop.examples;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.util.ArrayList;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.examples.WordCount.Reduce;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.CustomGzipInputFormat;
import org.apache.hadoop.mapred.Employee;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
public class CustomObjectReader extends Configured implements Tool,
Mapper<Text, BytesWritable, Text, Text> {
private JobConf jobConf;
// public static final Log LOG =
LogFactory.getLog(ArcSegmentCreator.class);
public CustomObjectReader() {
}
public void configure(JobConf job) {
this.jobConf = job;
}
public void close() {
}
public CustomObjectReader(Configuration conf) {
setConf(conf);
}
public void map(Text key, BytesWritable bytes,
OutputCollector<Text, Text> output, Reporter reporter)
throws IOException {
ObjectInputStream ois = new ObjectInputStream(new
ByteArrayInputStream(
bytes.get()));
long count =0;
for (int i = 0; i < 500000; i++) {
Employee sarah = null;
try {
sarah = (Employee) ois.readObject();
} catch (ClassNotFoundException e) {
System.out.println("EXCEPTOPN" + e);
e.printStackTrace();
}
sarah.print();
}
}
public void readArcs(Path arcFiles, Path outDir) throws IOException {
}
public int run(String[] args) throws Exception {
String usage = "Usage: ArcReader <arcFiles>";
if (args.length < 2) {
System.err.println(usage);
return -1;
}
JobConf job = new JobConf(getConf(), CustomObjectReader.class);
job.setJobName("custom reader");
job.setInputFormat(CustomGzipInputFormat.class);
job.setMapperClass(CustomObjectReader.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
job.setOutputFormat(TextOutputFormat.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
job.setReducerClass(Reduce.class);
List<String> other_args = new ArrayList<String>();
for (int i = 0; i < args.length; ++i) {
try {
if ("-m".equals(args[i])) {
job.setNumMapTasks(Integer.parseInt(args[++i]));
} else if ("-r".equals(args[i])) {
job.setNumReduceTasks(Integer.parseInt(args[++i]));
} else {
other_args.add(args[i]);
}
} catch (NumberFormatException except) {
System.out.println("ERROR: Integer expected instead of "
+ args[i]);
return printUsage();
} catch (ArrayIndexOutOfBoundsException except) {
System.out.println("ERROR: Required parameter missing from "
+ args[i - 1]);
return printUsage();
}
}
// Make sure there are exactly 2 parameters left.
if (other_args.size() != 2) {
System.out.println("ERROR: Wrong number of parameters: "
+ other_args.size() + " instead of 2.");
return printUsage();
}
FileInputFormat.setInputPaths(job, other_args.get(0));
FileOutputFormat.setOutputPath(job, new Path(other_args.get(1)));
JobClient.runJob(job);
return 0;
}
static int printUsage() {
System.out
.println("wordcount [-m <maps>] [-r <reduces>] <input>
<output>");
ToolRunner.printGenericCommandUsage(System.out);
return -1;
}
public static void main(String[] args) throws Exception {
int res = ToolRunner.run(new Configuration(), new
CustomObjectReader(),
args);
System.exit(res);
}
}
***********************************************************************************
package org.apache.hadoop.mapred;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.io.Serializable;
import java.util.zip.GZIPInputStream;
import java.util.zip.GZIPOutputStream;
public class Employee implements Serializable {
String name;
int age;
int salary;
public Employee(String name, int age, int salary) {
this.name = name;
this.age = age;
this.salary = salary;
}
public void print() {
System.out.println("Record for: " + name);
System.out.println("Name: " + name);
System.out.println("Age: " + age);
System.out.println("Salary: " + salary);
}
public String printObject() {
return "name" +name +"\tage" + age +"\tSalary: " + salary;
}
public static void main(String argv[]) throws Exception {
// create some objects
org.apache.hadoop.mapred.Employee sarah = new Employee("S.
Jordan", 28, 56000);
// serialize the objects sarah and sam
FileOutputStream fos = new FileOutputStream(
"/home/amitsingh/OUTPUT/out.bin");
GZIPOutputStream gz = new GZIPOutputStream(fos);
ObjectOutputStream oos = new ObjectOutputStream(gz);
for (int i = 0; i < 500000; i++) {
org.apache.hadoop.mapred.Employee sam = new Employee(i +
"MyNameIsGreat", i,
i + 50000);
oos.writeObject(sam);
}
oos.flush();
oos.close();
//fos.close();
// deserialize objects sarah and sam
FileInputStream fis = new
FileInputStream("/home/amitsingh/OUTPUT/out.bin");
GZIPInputStream gs = new GZIPInputStream(fis);
ObjectInputStream ois = new ObjectInputStream(gs);
for (int i = 0; i < 10; i++) {
sarah = (org.apache.hadoop.mapred.Employee)
ois.readObject();
sarah.print();
}
ois.close();
//fis.close();
}
}