Apache Wiki
Mon, 02 Nov 2009 15:24:03 -0800
Dear Wiki user, You have subscribed to a wiki page or wiki category on "Pig Wiki" for change notification.
The "LoadStoreRedesignProposal" page has been changed by PradeepKamath. http://wiki.apache.org/pig/LoadStoreRedesignProposal?action=diff&rev1=15&rev2=16 -------------------------------------------------- The '''!LoadCaster''' interface will include bytesToInt, bytesToLong, etc. functions currently in !LoadFunc. UTF8!StorageConverter will implement this interface. - Open Question: Should the methods to convert to a Bag, Tuple and Map take a Schema (ResourceSchema?) argument? + '''Open Question''': Should the methods to convert to a Bag, Tuple and Map take a Schema (ResourceSchema?) argument? '''!LoadMetadata''' @@ -425, +425 @@ result. Since Pig still needs to add information to !InputSplits, user provided !InputFormats and !InputSplits cannot be used directly. Instead, the - proposal is to change !PigInputFormat to contain an !InputFormat. !PigInputFormat will return !PigInputSplits, each of which contain an + proposal is to change !PigInputFormat to represent the job's !InputFormat to !Hadoop and internally to handle the complexity of multiple inputs and hence multiple !InputFormats. !PigInputFormat will return !PigSplits each of which contain an - !InputSplit. In addition, !PigInputSplit will contain the necessary information to allow Pig to correctly address tuples to the correct data + !InputSplit. In addition, !PigSplit will contain the necessary information to allow Pig to correctly address tuples to the correct data processing pipeline. - In order to support arbitrary Hadoop !InputFormats, it will be necessary to construct a load function, !InputFormatLoader, that will take an + In order to support arbitrary Hadoop !InputFormats, Pig can provide a load function, !InputFormatLoader, that will take an - !InputFormat as a constructor argument. When asked by Pig which !InputFormat to use, it will return the one indicated by the user. Its call to + !InputFormat as a constructor argument. Only !InputFormats which have zero argument constructors can be supported since Pig will try to instantiate the supplied !InputFormat using reflection. When asked by Pig which !InputFormat to use, it will return the one indicated by the user. Its call to getNext will then take the key and value provided by the associated !RecordReader and construct a two field tuple. These types will be converted to Pig types as follows: @@ -445, +445 @@ || !BooleanWritable || int || In the future if Pig exposes boolean as a first class type, this would change to boolean || || !ByteWritable || int || || || !NullWritable || null || || - || All others || byte array || || + || All others || byte array || How do we construct a byte array from arbitrary types? || Since the format of any other types are unknown to Pig and cannot be generalized, it does not make sense to provide casts from byte array to pig types via a !LoadCaster. If users wish to use an !InputFormat that uses types beyond these and cast them to Pig types, they can extend the @@ -469, +469 @@ Positioning information in an !InputSplit presents a problem. Hadoop 0.18 has a getPos call in the !InputSplit, but it has been removed in 0.20. The reason is that input from files can generally be assigned a position, though it may not always be - accurate, as in the bzip case. But some input formats position may not have meaning. Even if Pig does not switch to using !InputFormats it will + accurate, as in the bzip case. But for some input formats position may not have meaning. Even if Pig does not switch to using !InputFormats it will have to deal with this issue, just as MR has. + These changes will affect the !SamplableLoader interface. Currently it uses skip and getPos to move the underlying stream so that it can pick + up a sample of tuples out of a block. Since it would sit atop !InputFormat it would no longer have access to the underlying stream. It would be + changed instead to skip a number of tuples. + However, in some places Pig needs this position information. In particular, when building an index for a merge join, Pig needs a way to mark a + location in an input while building the index and then return to that position during the join. In this new proposal, the merge join index will contain filename and split index (index of the split in the List returned by InputFormat.getSplits()). The merge join code at run time will then seek to the right split in the file and process from that split on. For this to work the assumption is if we start from a split and read from there on to the last splitwe get sorted data - i.e. the splits in getSplits() is preserve ordering. Since this in general cannot be guaranteed, the proposal to handle this is to sample the first and last keys in each split and record both values in the index entry for that split. The index is then sorted based on both the first and last key. While seeking into the right file based on the join key during merge join processing, the implementation will then read the relevant splits in the right file as indicated in the index (reading the splits from the matching index entry to the last index entry). - location in an input while building the index and then return to that position during the join. This issue will have to be pursued with the MR - team to see if there is a way to provide this functionality for input types where it makes sense. If they are unwilling to provide it, or it will - take them some time to provide it, we could instead create our own !SeekableInputFormat that would define a way to mark and seek. Zebra could - implement this for their !InputFormat. The Pig team could extend !TextInputFormat to implement it for text files. !PigStorage would then use this - new !SeekableTextInputFormat rather than using !TextInputFormat directly. - - These changes will affect the !SamplableLoader interface. Currently it uses skip and getPos to move the underlying stream so that it can pick - up a sample of tuples out of a block. Since it would sit atop !InputFormat it would no longer have access to the underlying stream. It could be - changed instead to skip a number of tuples. Rather than skipping a uniform amount inside the block it could skip a random number of tuples each - time. This will result in a better sample, but also risks running out of data before obtaining the desired number of tuples. These changes will also affect loaders that need to read a record in order to determine their schema, such as !BinStorage or a JSON loader. These loaders will need to know how to read at least the first record on their own, without the benefit of the underlying !InputFormat, since they will need to call this on the front end where an !InputFormat has not yet been used. In addition to opening files as part of Map-Reduce, Pig loaders also open files on the side in MR jobs. The new load interface needs to be able to - open these side files as well. According to Arun, this is doable but creating a new instance of the appropriate !InputFormat, calls getSplits, and + open these side files as well. For this we would need to create a new instance of the appropriate !InputFormat, calls getSplits, and - then creating a !RecordReader on it. + then iterate over the split and for each split create a RecordReader and process the data returned by the RecordReader and then move to the next split. '''Performance concerns.''' @@ -507, +502 @@ == StoreFunc and OutputFormat Interaction == In the same way that !LoadFunc currently duplicates some functionality of !InputFormat, !StoreFunc duplicates some functionality of !OutputFormat. !StoreFunc will be changed to deal primarily with converting a tuple to a key value pair that can be stored by Hadoop. - To support arbitrary !OutputFormats, a new storage function !OutputFormatStorage will be written that will take an !OutputFormat as a constructor + To support arbitrary !OutputFormats, a new storage function !OutputFormatStorage could be written that will take an !OutputFormat as a constructor - argument. Tuples to be stored by this storage function must have either one or two fields. If they have two fields, the first of will be taken + argument. Only !OutputFormats which have zero argument constructors can be supported since Pig will try to instantiate the supplied !OutputFormat using reflection. Tuples to be stored by this storage function must have either one or two fields. If they have two fields, the first of will be taken to be the key, and the second the value. If they have one, the key will be set to null and the value will be taken from the single field. Data type conversion on this data will be done in the same way as noted above for !InputFormatLoader. Open Questions: - 1. Does all this force us to switch to Hadoop for local mode as well? We aren't opposed to using Hadoop for local mode it just needs to get reasonable fast. Can we use !InputFormat ''et. al.'' on local files without using the whole HDFS structure? '''Answer''' According to Hadoop documentation !TextInputFormat works on local files as well as hdfs files. We may need to catch that we are in local mode and change the filename to `file://` + 1. Does all this force us to switch to Hadoop for local mode as well? We aren't opposed to using Hadoop for local mode it just needs to get reasonable fast. Can we use !InputFormat ''et. al.'' on local files without using the whole HDFS structure? '''Answer''' According to Hadoop documentation !TextInputFormat works on local files as well as hdfs files. We may need to catch that we are in local mode and change the filename to `file://` OR change to using Hadoop's local mode - 1. How will we worked with compressed files? !FileInputFormat already works with bzip and gzip compressed files, producing reasonable splits. !PigStorage will be reworked to depend on !FileInputFormat (or a descendant thereof, see next item) and should therefore be able to use this functionality. + 1. How will we worked with compressed files? !FileInputFormat already works with bzip and gzip compressed files, producing reasonable splits. !PigStorage will be reworked to depend on !FileInputFormat (or a descendant thereof, see next item) and should therefore be able to use this functionality. Currently Pig supports gz/bzip for arbitrary loadfunc/storefunc combinations. With this proposal, gz/bzip format will only be supported for load/store using PigStorage. - 1. How will the need for mark and seek in index construction for merge join be handled? In the long term we'd like Hadoop to handle this for us by creating a !SeekableInputFormat that would add this functionality. In the meantime we can extend !FileInputFormat to !PigFileInputFormat. We can add getPos() call to this class that will provide a position to start reading at to find the tuple being indexed. Note that this position will not necessarily be the exact position of the tuple, but a position from which the tuple can be found. We can also change the getSplits call on this method to return a split that is specific to a given position so that it can be used during the join. + + + === Implementation details and status === + + ==== Current status ==== + A branch -'load-store-redesign' (http://svn.apache.org/repos/asf/hadoop/pig/branches/load-store-redesign) has been created to undertake work on this proposal. As of today (Nov 2. 2009) this branch has simple load-store working for PigStorage and BinStorage. Joins on multiple inputs and multi store queries with multi query optimization also work. Some of the recent changes in the proposal above (the changes noted under Nov 2. 2009 in the Changes below) have not been incorporated. A list (may not be comprehensive) of remaining tasks is listed in a subsection below. + + ==== Notes on implementation details ==== + + ==== Remaining Tasks ==== + * BinStorage needs to implement LoadMetadata's getSchema() to replace current determineSchema() + * piggybank loaders/storers need to be ported + * fix lineage code to use LoadCaster instead of LoadFunc + * local mode needs to be ported + * PigDump needs to be ported + * poload needs to be ported + * Need to handle passing loadfunc specific info between different instances of loadfunc (Different instances in front end and + between front end and back end - we need what is required in PIG-602) (setPartitionFilter() and pushOperators()for example needs + this - these methods are called in the front end but the information passed is needed in the backend) + * For ResourceSchema to be effectively used for communicating schema, we must fix the two level access issues with + schema of bags in current schema before we make these changes, otherwise that same contagion will afflict us here. + * Input/Output handler code in streaming needs to be ported + * split by file will have to removed from language + * fix code with FIXME in comment relating to load-store redesign + * Decide on what we should do with ReversibleLoadFunc and multiquery optimization + + == Changes == Sept 23 2009, Gates @@ -546, +567 @@ * Changed setSchema() to checkSchema since this method is called only to allow StoreFunc to check * Removed allFinished() - same functionality already present in !OutputCommitter.cleanupJob() + Added a new section 'Implementation details and status' +