Hi,
I write an UDF load function "public class CslTextLoader extends
FileInputLoadFunc implements LoadPushDown, LoadMetadata { ......}" refering to
PigStorage.
Compared to PigStorage, CslTextLoader lack of store feature, and change the
input format like this:
@Override
public InputFormat getInputFormat() {
return new XRecordInputFormat();
}
@Override
public void prepareToRead(RecordReader reader, PigSplit split) {
in = (XLineRecordReader)reader;
if (tagFile || tagPath) {
sourcePath = ((FileSplit)split.getWrappedSplit()).getPath();
}
}
And I have attached the XRecordInputFormat (extends
FileInputFormat<LongWritable, Text>, this is not the same as
PigTextInputFormat)and XLineRecordReader to this email.
I write a pig script like this:
A = load 'pigTest/pigTest.txt' using CslTextLoader('=') as
(f0,f1,f2,f3,f4:{(g1)},f5,f6:{(t1)});
C = filter A by f1 == 'LogRouteUpdate';
illustrate C;
D = foreach C generate f0 as LogLen, f1 as LogType, f5 as Strenth, f6 as
OtherStren:{(t1:int)};
illustrate D;
dump D;
The issue is that:
D can be illustrated correctly, like this:
-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
| C | f0:bytearray | f1:bytearray | f2:bytearray | f3:bytearray
| f4:bag{:tuple(t1:bytearray)} | f5:bytearray |
f6:bag{:tuple(t1:bytearray)} |
-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
| | 119 | LogRouteUpdate | 10.88.46.100 | null
| {} | 8 | {(13), (8)}
|
-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
-----------------------------------------------------------------------------------------------------------------------
| D | LogLen:bytearray | LogType:bytearray | Strenth:bytearray |
OtherStren:bag{:tuple(t1:int)} |
-----------------------------------------------------------------------------------------------------------------------
| | 119 | LogRouteUpdate | 8 |
{(13), (8)} |
-----------------------------------------------------------------------------------------------------------------------
But dump D error, it can NOT locate the f5 and f6 correctly, dump result like
this:
(119,LogRouteUpdate,10.88.46.100,)
And "store D using PigStorage" is the same error.
It seem that the AS schema did not be matched to generate, or dump, or store
feature. Where should I pay attention to when re-writing InputFormat and
RecordReader about the schama matching to generate, dump feature? Can you give
me some suggesion? Thank you!
BR.
Squall Luo
package org.apache.pig.backend.hadoop.executionengine.mapReduceLayer;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.Seekable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.compress.CodecPool;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.CompressionCodecFactory;
import org.apache.hadoop.io.compress.Decompressor;
import org.apache.hadoop.io.compress.SplitCompressionInputStream;
import org.apache.hadoop.io.compress.SplittableCompressionCodec;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.util.LineReader;
import org.apache.commons.logging.LogFactory;
import org.apache.commons.logging.Log;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
/**
* Treats keys as offset in file and value as line.
*/
public class XLineRecordReader extends RecordReader<LongWritable, Text> {
private static final Log LOG = LogFactory.getLog(XLineRecordReader.class);
private CompressionCodecFactory compressionCodecs = null;
private long start;
private long pos;
private long end;
private LineReader in;
private int maxLineLength;
private LongWritable key = null;
private Text value = null;
private Seekable filePosition;
private CompressionCodec codec;
private Decompressor decompressor;
public static final String SEPARATE_STR =
"=================================================================";
public void initialize(InputSplit genericSplit,
TaskAttemptContext context) throws IOException {
FileSplit split = (FileSplit) genericSplit;
Configuration job = context.getConfiguration();
this.maxLineLength = job.getInt("mapred.linerecordreader.maxlength",
Integer.MAX_VALUE);
start = split.getStart();
end = start + split.getLength();
//System.out.println("XLineRecordReader:: initialize " + start + "~"+end +
" : " + split.getPath().toString());
final Path file = split.getPath();
compressionCodecs = new CompressionCodecFactory(job);
codec = compressionCodecs.getCodec(file);
// open the file and seek to the start of the split
FileSystem fs = file.getFileSystem(job);
FSDataInputStream fileIn = fs.open(split.getPath());
if (isCompressedInput()) {
decompressor = CodecPool.getDecompressor(codec);
if (codec instanceof SplittableCompressionCodec) {
final SplitCompressionInputStream cIn =
((SplittableCompressionCodec)codec).createInputStream(
fileIn, decompressor, start, end,
SplittableCompressionCodec.READ_MODE.BYBLOCK);
in = new LineReader(cIn, job);
start = cIn.getAdjustedStart();
end = cIn.getAdjustedEnd();
filePosition = (Seekable)cIn;
} else {
in = new LineReader(codec.createInputStream(fileIn, decompressor),
job);
filePosition = fileIn;
}
} else {
fileIn.seek(start);
in = new LineReader(fileIn, job);
filePosition = fileIn;
}
// If this is not the first split, we always throw away first record
// because we always (except the last split) read one extra line in
// next() method.
// if (start != 0) {
// //start += in.readLine(new Text(), 0, maxBytesToConsume(start));
// start += readXLine(new Text(), 0, maxBytesToConsume(start));
// }
this.pos = start;
}
private boolean isCompressedInput() {
return (codec != null);
}
private int maxBytesToConsume(long pos) {
return isCompressedInput()
? Integer.MAX_VALUE
: (int) Math.min(Integer.MAX_VALUE, end - pos);
}
private long getFilePosition() throws IOException {
long retVal;
if (isCompressedInput() && null != filePosition) {
retVal = filePosition.getPos();
} else {
retVal = pos;
}
return retVal;
}
public int readXLine(Text str, int maxLineLength, int maxBytesToConsume)
throws IOException {
int xlineSize = 0;
int curLineSize = 0;
Text tmpValue = new Text();
String tmpXLineStr = new String();
curLineSize = in.readLine(tmpValue,maxLineLength,maxBytesToConsume);
//System.out.println("readXLine(out of while): " + maxLineLength +
"-" + maxBytesToConsume + tmpValue.toString());
while(curLineSize>0)
{
xlineSize += curLineSize;
tmpXLineStr = tmpXLineStr + tmpValue.toString() + "\n";
if(tmpValue.toString().indexOf(SEPARATE_STR)>=0)
{
break;
}
curLineSize =
in.readLine(tmpValue,this.maxLineLength,this.maxLineLength);
//System.out.println("readXLine(while): " + maxLineLength +
"-" + maxBytesToConsume + tmpValue.toString());
}
//System.out.println("readXLine: " + tmpXLineStr);
//System.out.println("readXLine(length): " + xlineSize);
str.set(tmpXLineStr);
return xlineSize;
}
public boolean nextKeyValue() throws IOException {
if (key == null) {
key = new LongWritable();
}
key.set(pos);
if (value == null) {
value = new Text();
}
int newSize = 0;
// We always read one extra line, which lies outside the upper
// split limit i.e. (end - 1)
while (getFilePosition() <= end) {
// newSize = in.readLine(value, maxLineLength,
// Math.max(maxBytesToConsume(pos), maxLineLength));
newSize = readXLine(value, maxLineLength,
Math.max(maxBytesToConsume(pos), maxLineLength));
if (newSize == 0) {
break;
}
pos += newSize;
if (newSize < maxLineLength) {
break;
}
// line too long. try again
LOG.info("Skipped line of size " + newSize + " at pos " +
(pos - newSize));
}
//System.out.println("nextKeyValue: " + key.toString() + " - " +
value.toString());
if (newSize == 0) {
key = null;
value = null;
return false;
} else {
return true;
}
}
@Override
public LongWritable getCurrentKey() {
return key;
}
@Override
public Text getCurrentValue() {
return value;
}
/**
* Get the progress within the split
*/
public float getProgress() throws IOException {
if (start == end) {
return 0.0f;
} else {
return Math.min(1.0f,
(getFilePosition() - start) / (float)(end - start));
}
}
public synchronized void close() throws IOException {
try {
if (in != null) {
in.close();
}
} finally {
if (decompressor != null) {
CodecPool.returnDecompressor(decompressor);
}
}
}
}
package org.apache.pig.backend.hadoop.executionengine.mapReduceLayer;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.util.LineReader;
import org.apache.hadoop.mapreduce.lib.input.*;
/**
* XLineInputFormat which splits variable lines of input as one split.
*
* In many "pleasantly" parallel applications, each process/mapper
* processes the same input file (s), but with computations are
* controlled by different parameters.(Referred to as "parameter sweeps").
* One way to achieve this, is to specify a set of parameters
* (one set per line) as input in a control file
* (which is the input path to the map-reduce application,
* where as the input dataset is specified
* via a config variable in JobConf.).
*
* The NLineInputFormat can be used in such applications, that splits
* the input file such that by default, one line is fed as
* a value to one map task, and key is the offset.
* i.e. (k,v) is (LongWritable, Text).
* The location hints will span the whole mapred cluster.
*/
@InterfaceAudience.Public
@InterfaceStability.Stable
public class XRecordInputFormat extends FileInputFormat<LongWritable, Text> {
// public static final String LINES_PER_MAP =
// "mapreduce.input.lineinputformat.linespermap";
public static int maxNumRecords = 10000;
//public static int currNumRecords = 10000;
public static final String SEPARATE_STR =
"=================================================================";
public RecordReader<LongWritable, Text> createRecordReader (
InputSplit genericSplit, TaskAttemptContext context)
throws IOException {
context.setStatus(genericSplit.toString());
//return new LineRecordReader()
XLineRecordReader xRreader = new XLineRecordReader();
xRreader.initialize(genericSplit, context);
return xRreader;
}
protected boolean isSplitable(JobContext context, Path filename) {
//System.out.println("isSplitable: true");
return true;
//return false;
}
/**
* Logically splits the set of input files for the job, splits N lines
* of the input as one split.
*
* @see FileInputFormat#getSplits(JobContext)
*/
public List<InputSplit> getSplits(JobContext job)
throws IOException {
List<InputSplit> splits = new ArrayList<InputSplit>();
//int numLinesPerSplit = getNumLinesPerSplit(job);
for (FileStatus status : listStatus(job)) {
splits.addAll(getSplitsForFile(status,
job.getConfiguration(), SEPARATE_STR));
}
//System.out.println("getSplits: " + splits.toString());
return splits;
}
public static List<FileSplit> getSplitsForFile(FileStatus status,
Configuration conf, String strSeparator ) throws IOException {
List<FileSplit> splits = new ArrayList<FileSplit> ();
Path fileName = status.getPath();
if (status.isDir()) {
throw new IOException("Not a file: " + fileName);
}
FileSystem fs = fileName.getFileSystem(conf);
LineReader lr = null;
try {
FSDataInputStream in = fs.open(fileName);
lr = new LineReader(in, conf);
Text line = new Text();
int numLines = 0;
int numRecords = 0;
long begin = 0;
long length = 0;
int num = -1;
while ((num = lr.readLine(line)) > 0) {
//System.out.println("getSplitsForFile: " + line.toString());
numLines++;
length += num;
if (line.toString().indexOf(strSeparator) >=0 ) {
numRecords ++;
}
if(numRecords == maxNumRecords)
{
splits.add(createFileSplit(fileName, begin, length));
begin += length;
length = 0;
numLines = 0;
numRecords = 0;
}
}
if (numLines != 0) {
splits.add(createFileSplit(fileName, begin, length));
}
} finally {
if (lr != null) {
lr.close();
}
}
return splits;
}
/**
* NLineInputFormat uses LineRecordReader, which always reads
* (and consumes) at least one character out of its upper split
* boundary. So to make sure that each mapper gets N lines, we
* move back the upper split limits of each split
* by one character here.
* @param fileName Path of file
* @param begin the position of the first byte in the file to process
* @param length number of bytes in InputSplit
* @return FileSplit
*/
protected static FileSplit createFileSplit(Path fileName, long begin, long
length) {
// return (begin == 0)
// ? new FileSplit(fileName, begin, length - 1, new String[] {})
// : new FileSplit(fileName, begin - 1, length, new String[] {});
if(begin == 0)
{
//System.out.println("createFileSplit: " +
fileName.toString() + " ("+ begin + "," + (begin +length - 1) + ")");
return new FileSplit(fileName, begin, length - 1, new
String[] {});
}
else
{
//System.out.println("createFileSplit: " +
fileName.toString() + " ("+ (begin - 1) + "," + (begin +length) + ")");
return new FileSplit(fileName, begin - 1, length, new
String[] {});
}
}
/**
* Set the number of Records per split
* @param job the job to modify
* @param numLines the number of lines per split
*/
public static void setNumRecordsPerSplit(Job job, int numRecords) {
//job.getConfiguration().setInt(LINES_PER_MAP, numLines);
maxNumRecords = numRecords;
}
/**
* Get the number of Records per split
* @param job the job
* @return the number of lines per split
*/
public static int getNumRecordsPerSplit(JobContext job) {
//return job.getConfiguration().getInt(LINES_PER_MAP, 1);
return maxNumRecords;
}
}