Hi,
 
I write an UDF load function "public class CslTextLoader extends 
FileInputLoadFunc implements LoadPushDown, LoadMetadata { ......}" refering to 
PigStorage.
Compared to PigStorage, CslTextLoader lack of store feature, and change the 
input format like this:
    @Override
    public InputFormat getInputFormat() {
         return new XRecordInputFormat();
    }
    @Override
    public void prepareToRead(RecordReader reader, PigSplit split) {
        in = (XLineRecordReader)reader;
        if (tagFile || tagPath) {
            sourcePath = ((FileSplit)split.getWrappedSplit()).getPath();
        }
    }
And I have attached the XRecordInputFormat (extends 
FileInputFormat<LongWritable, Text>, this is not the same as 
PigTextInputFormat)and XLineRecordReader to this email.
 
I write a pig script like this:
A = load 'pigTest/pigTest.txt' using CslTextLoader('=') as 
(f0,f1,f2,f3,f4:{(g1)},f5,f6:{(t1)});
C = filter A by f1 == 'LogRouteUpdate';
illustrate C;
D = foreach C generate f0 as LogLen, f1 as LogType, f5 as Strenth, f6 as 
OtherStren:{(t1:int)};
illustrate D;
dump D;
 
The issue is that:
D can be illustrated correctly, like this:
-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
| C     | f0:bytearray    | f1:bytearray    | f2:bytearray    | f3:bytearray    
| f4:bag{:tuple(t1:bytearray)}          | f5:bytearray    | 
f6:bag{:tuple(t1:bytearray)}          |
-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
|       | 119             | LogRouteUpdate  | 10.88.46.100    | null            
| {}                                    | 8               | {(13), (8)}         
                  |
-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
-----------------------------------------------------------------------------------------------------------------------
| D     | LogLen:bytearray    | LogType:bytearray    | Strenth:bytearray    | 
OtherStren:bag{:tuple(t1:int)}          |
-----------------------------------------------------------------------------------------------------------------------
|       | 119                 | LogRouteUpdate       | 8                    | 
{(13), (8)}                             |
-----------------------------------------------------------------------------------------------------------------------
 
But dump D error, it can NOT locate the f5 and f6 correctly, dump result like 
this:
(119,LogRouteUpdate,10.88.46.100,)
And "store D using PigStorage" is the same error.
 
It seem that the AS schema did not be matched to generate, or dump, or store 
feature. Where should I pay attention to when re-writing InputFormat and 
RecordReader about the schama matching to generate, dump feature? Can you give 
me some suggesion? Thank you!
 
BR.
Squall Luo
 
 
 
package org.apache.pig.backend.hadoop.executionengine.mapReduceLayer;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.Seekable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.compress.CodecPool;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.CompressionCodecFactory;
import org.apache.hadoop.io.compress.Decompressor;
import org.apache.hadoop.io.compress.SplitCompressionInputStream;
import org.apache.hadoop.io.compress.SplittableCompressionCodec;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.util.LineReader;
import org.apache.commons.logging.LogFactory;
import org.apache.commons.logging.Log;

import org.apache.hadoop.mapreduce.lib.input.FileSplit;

/**
 * Treats keys as offset in file and value as line. 
 */
public class XLineRecordReader extends RecordReader<LongWritable, Text> {
  private static final Log LOG = LogFactory.getLog(XLineRecordReader.class);

  private CompressionCodecFactory compressionCodecs = null;
  private long start;
  private long pos;
  private long end;
  private LineReader in;
  private int maxLineLength;
  private LongWritable key = null;
  private Text value = null;
  private Seekable filePosition;
  private CompressionCodec codec;
  private Decompressor decompressor;
  
  public static final String SEPARATE_STR = 
                  
"=================================================================";

  public void initialize(InputSplit genericSplit,
                         TaskAttemptContext context) throws IOException {
    FileSplit split = (FileSplit) genericSplit;
    Configuration job = context.getConfiguration();
    this.maxLineLength = job.getInt("mapred.linerecordreader.maxlength",
                                    Integer.MAX_VALUE);
    start = split.getStart();
    end = start + split.getLength();
    
    //System.out.println("XLineRecordReader:: initialize " + start + "~"+end + 
" : " + split.getPath().toString());
    
    final Path file = split.getPath();
    compressionCodecs = new CompressionCodecFactory(job);
    codec = compressionCodecs.getCodec(file);

    // open the file and seek to the start of the split
    FileSystem fs = file.getFileSystem(job);
    FSDataInputStream fileIn = fs.open(split.getPath());

    if (isCompressedInput()) {
      decompressor = CodecPool.getDecompressor(codec);
      if (codec instanceof SplittableCompressionCodec) {
        final SplitCompressionInputStream cIn =
          ((SplittableCompressionCodec)codec).createInputStream(
            fileIn, decompressor, start, end,
            SplittableCompressionCodec.READ_MODE.BYBLOCK);
        in = new LineReader(cIn, job);
        start = cIn.getAdjustedStart();
        end = cIn.getAdjustedEnd();
        filePosition = (Seekable)cIn;
      } else {
        in = new LineReader(codec.createInputStream(fileIn, decompressor),
            job);
        filePosition = fileIn;
      }
    } else {
      fileIn.seek(start);
      in = new LineReader(fileIn, job);
      filePosition = fileIn;
    }
    // If this is not the first split, we always throw away first record
    // because we always (except the last split) read one extra line in
    // next() method.
//    if (start != 0) {
//      //start += in.readLine(new Text(), 0, maxBytesToConsume(start));
//      start += readXLine(new Text(), 0, maxBytesToConsume(start));
//    }
    this.pos = start;
  }
  
  private boolean isCompressedInput() {
    return (codec != null);
  }

  private int maxBytesToConsume(long pos) {
    return isCompressedInput()
      ? Integer.MAX_VALUE
      : (int) Math.min(Integer.MAX_VALUE, end - pos);
  }

  private long getFilePosition() throws IOException {
    long retVal;
    if (isCompressedInput() && null != filePosition) {
      retVal = filePosition.getPos();
    } else {
      retVal = pos;
    }
    return retVal;
  }
  
  public int readXLine(Text str, int maxLineLength, int maxBytesToConsume) 
throws IOException {
          int xlineSize = 0;
          int curLineSize = 0;
          Text tmpValue = new Text();
          String tmpXLineStr = new String();
          curLineSize = in.readLine(tmpValue,maxLineLength,maxBytesToConsume);
          //System.out.println("readXLine(out of while): " + maxLineLength + 
"-" + maxBytesToConsume + tmpValue.toString());
          while(curLineSize>0)
          {
                  xlineSize += curLineSize;
                  tmpXLineStr = tmpXLineStr + tmpValue.toString() + "\n";       
          
                  if(tmpValue.toString().indexOf(SEPARATE_STR)>=0)
                  {
                          break;
                  }
                  curLineSize = 
in.readLine(tmpValue,this.maxLineLength,this.maxLineLength);
                  //System.out.println("readXLine(while): " + maxLineLength + 
"-" + maxBytesToConsume + tmpValue.toString());
                  
                  
          }
          //System.out.println("readXLine: " + tmpXLineStr);
          //System.out.println("readXLine(length): " + xlineSize);
          str.set(tmpXLineStr);
          return xlineSize;
  }

  public boolean nextKeyValue() throws IOException {
    if (key == null) {
      key = new LongWritable();
    }
    key.set(pos);
    if (value == null) {
      value = new Text();
    }
    int newSize = 0;
    // We always read one extra line, which lies outside the upper
    // split limit i.e. (end - 1)
    while (getFilePosition() <= end) {
        
//      newSize = in.readLine(value, maxLineLength,
//          Math.max(maxBytesToConsume(pos), maxLineLength));
        newSize = readXLine(value, maxLineLength,
              Math.max(maxBytesToConsume(pos), maxLineLength));
      if (newSize == 0) {
        break;
      }
      pos += newSize;
      if (newSize < maxLineLength) {
        break;
      }

      // line too long. try again
      LOG.info("Skipped line of size " + newSize + " at pos " + 
               (pos - newSize));
    }
    //System.out.println("nextKeyValue: " + key.toString() + " - " + 
value.toString());
    if (newSize == 0) {
      key = null;
      value = null;
      return false;
    } else {
      return true;
    }
  }

  @Override
  public LongWritable getCurrentKey() {
    return key;
  }

  @Override
  public Text getCurrentValue() {
    return value;
  }

  /**
   * Get the progress within the split
   */
  public float getProgress() throws IOException {
    if (start == end) {
      return 0.0f;
    } else {
      return Math.min(1.0f,
        (getFilePosition() - start) / (float)(end - start));
    }
  }

  public synchronized void close() throws IOException {
    try {
      if (in != null) {
        in.close();
      }
    } finally {
      if (decompressor != null) {
        CodecPool.returnDecompressor(decompressor);
      }
    }
  }
}

package org.apache.pig.backend.hadoop.executionengine.mapReduceLayer;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;

import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.util.LineReader;

import org.apache.hadoop.mapreduce.lib.input.*;


/**
 * XLineInputFormat which splits variable lines of input as one split.
 *
 * In many "pleasantly" parallel applications, each process/mapper 
 * processes the same input file (s), but with computations are 
 * controlled by different parameters.(Referred to as "parameter sweeps").
 * One way to achieve this, is to specify a set of parameters 
 * (one set per line) as input in a control file 
 * (which is the input path to the map-reduce application,
 * where as the input dataset is specified 
 * via a config variable in JobConf.).
 * 
 * The NLineInputFormat can be used in such applications, that splits 
 * the input file such that by default, one line is fed as
 * a value to one map task, and key is the offset.
 * i.e. (k,v) is (LongWritable, Text).
 * The location hints will span the whole mapred cluster.
 */
@InterfaceAudience.Public
@InterfaceStability.Stable
public class XRecordInputFormat extends FileInputFormat<LongWritable, Text> { 
//  public static final String LINES_PER_MAP = 
//    "mapreduce.input.lineinputformat.linespermap";
        public static int maxNumRecords = 10000;
        //public static int currNumRecords = 10000;

  public static final String SEPARATE_STR = 
                  
"=================================================================";
        
  public RecordReader<LongWritable, Text> createRecordReader (
      InputSplit genericSplit, TaskAttemptContext context) 
      throws IOException {
    context.setStatus(genericSplit.toString());
    //return new LineRecordReader()
    XLineRecordReader xRreader = new XLineRecordReader();
    xRreader.initialize(genericSplit, context);
    return xRreader;
  }
  
  protected boolean isSplitable(JobContext context, Path filename) {
                //System.out.println("isSplitable: true");
            return true;
                //return false;
          }

  /** 
   * Logically splits the set of input files for the job, splits N lines
   * of the input as one split.
   * 
   * @see FileInputFormat#getSplits(JobContext)
   */
  public List<InputSplit> getSplits(JobContext job)
  throws IOException {
    List<InputSplit> splits = new ArrayList<InputSplit>();
    //int numLinesPerSplit = getNumLinesPerSplit(job);
    for (FileStatus status : listStatus(job)) {
      splits.addAll(getSplitsForFile(status,
        job.getConfiguration(), SEPARATE_STR));
    }
    //System.out.println("getSplits: " + splits.toString());
    return splits;
  }
  
  public static List<FileSplit> getSplitsForFile(FileStatus status,
      Configuration conf, String strSeparator ) throws IOException {
    List<FileSplit> splits = new ArrayList<FileSplit> ();
    Path fileName = status.getPath();
    if (status.isDir()) {
      throw new IOException("Not a file: " + fileName);
    }
    FileSystem  fs = fileName.getFileSystem(conf);
    LineReader lr = null;
  
    try {
      FSDataInputStream in  = fs.open(fileName);
      lr = new LineReader(in, conf);
      Text line = new Text();
      int numLines = 0;
      int numRecords = 0;
      long begin = 0;
      long length = 0;
      int num = -1;
      while ((num = lr.readLine(line)) > 0) {
        //System.out.println("getSplitsForFile: " + line.toString());
        numLines++;
        length += num;
        if (line.toString().indexOf(strSeparator) >=0 ) {
                numRecords ++;
        }
        if(numRecords == maxNumRecords)
        {
            splits.add(createFileSplit(fileName, begin, length));
            begin += length;
            length = 0;
            numLines = 0;
            numRecords = 0;
        }
      }
      if (numLines != 0) {
        splits.add(createFileSplit(fileName, begin, length));
      }
    } finally {
      if (lr != null) {
        lr.close();
      }
    }
    return splits; 
  }

  /**
   * NLineInputFormat uses LineRecordReader, which always reads
   * (and consumes) at least one character out of its upper split
   * boundary. So to make sure that each mapper gets N lines, we
   * move back the upper split limits of each split 
   * by one character here.
   * @param fileName  Path of file
   * @param begin  the position of the first byte in the file to process
   * @param length  number of bytes in InputSplit
   * @return  FileSplit
   */
  protected static FileSplit createFileSplit(Path fileName, long begin, long 
length) {

//    return (begin == 0) 
//    ? new FileSplit(fileName, begin, length - 1, new String[] {})
//    : new FileSplit(fileName, begin - 1, length, new String[] {});
          if(begin == 0)
          {
                  //System.out.println("createFileSplit: " + 
fileName.toString() + " ("+ begin + "," + (begin +length - 1) + ")");
                  return new FileSplit(fileName, begin, length - 1, new 
String[] {});
          }
          else
          {
                  //System.out.println("createFileSplit: " + 
fileName.toString() + " ("+ (begin - 1) + "," + (begin +length) + ")");
                  return new FileSplit(fileName, begin - 1, length, new 
String[] {});
          }
  }
  
  /**
   * Set the number of Records per split
   * @param job the job to modify
   * @param numLines the number of lines per split
   */
  public static void setNumRecordsPerSplit(Job job, int numRecords) {
    //job.getConfiguration().setInt(LINES_PER_MAP, numLines);
          maxNumRecords = numRecords;
  }

  /**
   * Get the number of Records per split
   * @param job the job
   * @return the number of lines per split
   */
  public static int getNumRecordsPerSplit(JobContext job) {
    //return job.getConfiguration().getInt(LINES_PER_MAP, 1);
          return maxNumRecords;
  }
}

Reply via email to