pig-commits  

[Pig Wiki] Update of "LoadStoreRedesignProposal" by Pra deepKamath

Apache Wiki
Mon, 02 Nov 2009 15:24:03 -0800

Dear Wiki user,

You have subscribed to a wiki page or wiki category on "Pig Wiki" for change 
notification.

The "LoadStoreRedesignProposal" page has been changed by PradeepKamath.
http://wiki.apache.org/pig/LoadStoreRedesignProposal?action=diff&rev1=15&rev2=16

--------------------------------------------------

  The '''!LoadCaster''' interface will include bytesToInt, bytesToLong, etc. 
functions
  currently in !LoadFunc.  UTF8!StorageConverter will implement this interface.
  
- Open Question: Should the methods to convert to a Bag, Tuple and Map take a 
Schema (ResourceSchema?) argument? 
+ '''Open Question''': Should the methods to convert to a Bag, Tuple and Map 
take a Schema (ResourceSchema?) argument? 
  
  
  '''!LoadMetadata'''
@@ -425, +425 @@

  result.
  
  Since Pig still needs to add information to !InputSplits, user provided 
!InputFormats and !InputSplits cannot be used directly.  Instead, the
- proposal is to change !PigInputFormat to contain an !InputFormat.  
!PigInputFormat will return !PigInputSplits, each of which contain an
+ proposal is to change !PigInputFormat to represent the job's !InputFormat to 
!Hadoop and internally to handle the complexity of multiple inputs and hence 
multiple !InputFormats.  !PigInputFormat will return !PigSplits each of which 
contain an
- !InputSplit.  In addition, !PigInputSplit will contain the necessary 
information to allow Pig to correctly address tuples to the correct data
+ !InputSplit.  In addition, !PigSplit will contain the necessary information 
to allow Pig to correctly address tuples to the correct data
  processing pipeline.
  
- In order to support arbitrary Hadoop !InputFormats, it will be necessary to 
construct a load function, !InputFormatLoader, that will take an
+ In order to support arbitrary Hadoop !InputFormats, Pig can provide a load 
function, !InputFormatLoader, that will take an
- !InputFormat as a constructor argument.  When asked by Pig which !InputFormat 
to use, it will return the one indicated by the user.  Its call to
+ !InputFormat as a constructor argument.  Only !InputFormats which have zero 
argument constructors can be supported since Pig will try to instantiate the 
supplied !InputFormat using reflection. When asked by Pig which !InputFormat to 
use, it will return the one indicated by the user.  Its call to
  getNext will then take the key and value provided by the associated 
!RecordReader and construct a two field tuple.  These types will be converted
  to Pig types as follows:
  
@@ -445, +445 @@

  || !BooleanWritable || int        || In the future if Pig exposes boolean as 
a first class type, this would change to boolean ||
  || !ByteWritable    || int        ||                                          
                                                ||
  || !NullWritable    || null       ||                                          
                                                ||
- || All others       || byte array ||                                          
                                                ||
+ || All others       || byte array || How do we construct a byte array from 
arbitrary types?                                   ||
  
  Since the format of any other types are unknown to Pig and cannot be 
generalized, it does not make sense to provide casts from byte array to pig
  types via a !LoadCaster.  If users wish to use an !InputFormat that uses 
types beyond these and cast them to Pig types, they can extend the
@@ -469, +469 @@

  
  Positioning information in an !InputSplit presents a problem.  Hadoop 0.18 
has a getPos call in the !InputSplit, but it has been removed in 0.20.
  The reason is that input from files can generally be assigned a position, 
though it may not always be
- accurate, as in the bzip case.  But some input formats position may not have 
meaning.  Even if Pig does not switch to using !InputFormats it will
+ accurate, as in the bzip case.  But for some input formats position may not 
have meaning.  Even if Pig does not switch to using !InputFormats it will
  have to deal with this issue, just as MR has.
  
+ These changes will affect the !SamplableLoader interface.  Currently it uses 
skip and getPos to move the underlying stream so that it can pick
+ up a sample of tuples out of a block.  Since it would sit atop !InputFormat 
it would no longer have access to the underlying stream.  It would be
+ changed instead to skip a number of tuples. 
+ 
  However, in some places Pig needs this position information.  In particular, 
when building an index for a merge join, Pig needs a way to mark a
+ location in an input while building the index and then return to that 
position during the join. In this new proposal, the merge join index will 
contain filename and split index (index of the split in the List returned by 
InputFormat.getSplits()). The merge join code at run time will then seek to the 
right split in the file and process from that split on. For this to work the 
assumption is if we start from a split and read from there on to the last 
splitwe get sorted data - i.e. the splits in getSplits() is preserve ordering. 
Since this in general cannot be guaranteed, the proposal to handle this is to 
sample the first and last keys in each split and record both values in the 
index entry for that split. The index is then sorted based on both the first 
and last key. While seeking into the right file based on the join key during 
merge join processing, the implementation will then read the relevant splits in 
the right file as indicated in the index (reading the splits from the matching 
index entry to the last index entry).   
- location in an input while building the index and then return to that 
position during the join.  This issue will have to be pursued with the MR
- team to see if there is a way to provide this functionality for input types 
where it makes sense.  If they are unwilling to provide it, or it will
- take them some time to provide it, we could instead create our own 
!SeekableInputFormat that would define a way to mark and seek.  Zebra could
- implement this for their !InputFormat.  The Pig team could extend 
!TextInputFormat to implement it for text files.  !PigStorage would then use 
this
- new !SeekableTextInputFormat rather than using !TextInputFormat directly.
- 
- These changes will affect the !SamplableLoader interface.  Currently it uses 
skip and getPos to move the underlying stream so that it can pick
- up a sample of tuples out of a block.  Since it would sit atop !InputFormat 
it would no longer have access to the underlying stream.  It could be
- changed instead to skip a number of tuples.  Rather than skipping a uniform 
amount inside the block it could skip a random number of tuples each
- time.  This will result in a better sample, but also risks running out of 
data before obtaining the desired number of tuples.
  
  These changes will also affect loaders that need to read a record in order to 
determine their schema, such as !BinStorage or a JSON loader.
  These loaders will need to know how to read at least the first record on 
their own, without the benefit of the underlying !InputFormat, since
  they will need to call this on the front end where an !InputFormat has not 
yet been used.
  
  In addition to opening files as part of Map-Reduce, Pig loaders also open 
files on the side in MR jobs.  The new load interface needs to be able to
- open these side files as well.  According to Arun, this is doable but 
creating a new instance of the appropriate !InputFormat, calls getSplits, and
+ open these side files as well.  For this we would need to create a new 
instance of the appropriate !InputFormat, calls getSplits, and
- then creating a !RecordReader on it.
+ then iterate over the split and for each split create a RecordReader and 
process the data returned by the RecordReader and then move to the next split.
  
  '''Performance concerns.'''
  
@@ -507, +502 @@

  == StoreFunc and OutputFormat Interaction ==
  In the same way that !LoadFunc currently duplicates some functionality of 
!InputFormat, !StoreFunc duplicates some functionality of !OutputFormat.  
!StoreFunc will be changed to deal primarily with converting a tuple to a key 
value pair that can be stored by Hadoop.
  
- To support arbitrary !OutputFormats, a new storage function 
!OutputFormatStorage will be written that will take an !OutputFormat as a 
constructor
+ To support arbitrary !OutputFormats, a new storage function 
!OutputFormatStorage could be written that will take an !OutputFormat as a 
constructor
- argument.  Tuples to be stored by this storage function must have either one 
or two fields.   If they have two fields, the first of will be taken
+ argument. Only !OutputFormats which have zero argument constructors can be 
supported since Pig will try to instantiate the supplied !OutputFormat using 
reflection. Tuples to be stored by this storage function must have either one 
or two fields.   If they have two fields, the first of will be taken
  to be the key, and the second the value.  If they have one, the key will be 
set to null and the value will be taken from the single field.  Data
  type conversion on this data will be done in the same way as noted above for 
!InputFormatLoader.
  
  Open Questions:
-  1. Does all this force us to switch to Hadoop for local mode as well?  We 
aren't opposed to using Hadoop for local mode it just needs to get reasonable 
fast.  Can we use !InputFormat ''et. al.'' on local files without using the 
whole HDFS structure?  '''Answer''' According to Hadoop documentation 
!TextInputFormat works on local files as well as hdfs files.  We may need to 
catch that we are in local mode and change the filename to `file://`
+  1. Does all this force us to switch to Hadoop for local mode as well?  We 
aren't opposed to using Hadoop for local mode it just needs to get reasonable 
fast.  Can we use !InputFormat ''et. al.'' on local files without using the 
whole HDFS structure?  '''Answer''' According to Hadoop documentation 
!TextInputFormat works on local files as well as hdfs files.  We may need to 
catch that we are in local mode and change the filename to `file://` OR change 
to using Hadoop's local mode
-  1. How will we worked with compressed files?  !FileInputFormat already works 
with bzip and gzip compressed files, producing reasonable splits.  !PigStorage 
will be reworked to depend on !FileInputFormat (or a descendant thereof, see 
next item) and should therefore be able to use this functionality.
+  1. How will we worked with compressed files?  !FileInputFormat already works 
with bzip and gzip compressed files, producing reasonable splits.  !PigStorage 
will be reworked to depend on !FileInputFormat (or a descendant thereof, see 
next item) and should therefore be able to use this functionality. Currently 
Pig supports gz/bzip for arbitrary loadfunc/storefunc combinations. With this 
proposal, gz/bzip format will only be supported for load/store using PigStorage.
-  1. How will the need for mark and seek in index construction for merge join 
be handled?  In the long term we'd like Hadoop to handle this for us by 
creating a !SeekableInputFormat that would add this functionality.  In the 
meantime we can extend !FileInputFormat to !PigFileInputFormat.  We can add 
getPos() call to this class that will provide a position to start reading at to 
find the tuple being indexed.  Note that this position will not necessarily be 
the exact position of the tuple, but a position from which the tuple can be 
found.  We can also change the getSplits call on this method to return a split 
that is specific to a given position so that it can be used during the join.
+ 
+ 
+ === Implementation details and status ===
+ 
+ ==== Current status ====
+ A branch -'load-store-redesign' 
(http://svn.apache.org/repos/asf/hadoop/pig/branches/load-store-redesign) has 
been created to undertake work on this proposal. As of today (Nov 2. 2009) this 
branch has simple load-store working for PigStorage and BinStorage. Joins on 
multiple inputs and multi store queries with multi query optimization also 
work. Some of the recent changes in the proposal above (the changes noted under 
Nov 2. 2009 in the Changes below) have not been incorporated. A list (may not 
be comprehensive) of remaining tasks is listed in a subsection below.
+ 
+ ==== Notes on implementation details ====
+ 
+ ==== Remaining Tasks ====
+  * BinStorage needs to implement LoadMetadata's getSchema() to replace 
current determineSchema()
+  * piggybank loaders/storers need to be ported
+  * fix lineage code to use LoadCaster instead of LoadFunc
+  * local mode needs to be ported
+  * PigDump needs to be ported
+  * poload needs to be ported
+  * Need to handle passing loadfunc specific info between different instances 
of loadfunc (Different instances in front end and 
+ between front end and back end - we need what is required in PIG-602) 
(setPartitionFilter() and pushOperators()for example needs 
+ this - these methods are called in the front end but the information passed 
is needed in the backend)
+  * For ResourceSchema to be effectively used for communicating schema, we 
must fix the two level access issues with 
+ schema of bags in current schema before we make these changes, otherwise that 
same contagion will afflict us here. 
+  * Input/Output handler code in streaming needs to be ported 
+  * split by file will have to removed from language
+  * fix code with FIXME in comment relating to load-store redesign
+  * Decide on what we should do with ReversibleLoadFunc and multiquery 
optimization
+ 
+ 
  
  == Changes ==
  Sept 23 2009, Gates
@@ -546, +567 @@

   * Changed setSchema() to checkSchema since this method is called only to 
allow StoreFunc to check
   * Removed allFinished() - same functionality already present in 
!OutputCommitter.cleanupJob()
  
+ Added a new section 'Implementation details and status'
+