Hi jorgen,

thank you for taking the time showing me the pseudo-code for a
queueInputFormat.  I will see if I can implement more details and use
it for my project.

-- Jim

On 10/30/07, Johnson, Jorgen <[EMAIL PROTECTED]> wrote:
> Hey all,
>
> Any reason why something like this wouldn't work?
>
> Create a QueueInputFormat, which provides a RecordReader implementation that 
> pops values off a globally accessible queue*.  This would require filling the 
> queue with values prior to loading the map/red job.  This would allow the 
> mappers to cram values back into the queue for further processing when 
> necessary.
>
> We could even avoid pre-loading the queue by combining this with some form of 
> hybrid InputFormat which first reads a normal input file, but after each 
> reader has finished reading its piece of the input file (and before it 
> receives the OK to quit), it would try to pull items off the queue for 
> further processing.
>
> At the bottom is a pseudo-implementation to help illustrate what I'm saying.
>
> -jorgenj
>
> *A simple db table or file store in hdfs might be ways to accomplish this 
> globally accessible queue, or something like SQS might work: 
> http://www.amazon.com/Simple-Queue-Service-home-page/b?ie=UTF8&node=13584001 
> =)
>
>
> /** InputFormat which allows map/red jobs to process values
>  * out of a globally accessible queue
>  *
>  * Note: The queue must be primed prior to kicking off any job which
>  * attempts to process values in the queue
>  */
> public class QueueInputFormat extends Object implements InputFormat {
>
>     public RecordReader getRecordReader(InputSplit split, JobConf job, 
> Reporter reporter) throws IOException {
>         return new QueueRecordReader();
>     }
>
>     public InputSplit[] getSplits(JobConf job, int numSplits) throws 
> IOException {
>         int overrideNumSplits = job.getInt("input.queue.hardcoded.num.maps", 
> numSplits);
>
>         InputSplit[] splits = new InputSplit[overrideNumSplits];
>         for ( int i = 0; i < numSplits; i++ ) {
>             splits[i] = new QueueInputSplit(i);
>         }
>
>         return splits;
>     }
>
>     public void validateInput(JobConf job) throws IOException {
>         throw new UnsupportedOperationException("Implement your validation, 
> if needed");
>     }
>
>     private static final class QueueRecordReader implements RecordReader {
>         public void close() throws IOException {
>             // close the queue
>         }
>
>         public WritableComparable createKey() {
>             //this might be the id of this object from the queue, or whatever 
> you want
>             return new LongWritable();
>         }
>
>         public Writable createValue() {
>             //replace this with a type representing what's in your queue
>             return new Text();
>         }
>
>         public long getPos() throws IOException {
>             return (queueIsExhausted()?1:0);
>         }
>
>         public float getProgress() throws IOException {
>             //could change this to report progress based on size of queue...
>             return (queueIsExhausted()?1:0);
>         }
>
>         public boolean next(Writable key, Writable value) throws IOException {
>            ((LongWritable)key).set(1);
>            ((Text)value).set(fetchValueFromQueue());
>
>            return true;
>         }
>
>         /** Use this to determine when the queue is exhausted, which in
>          * turn determines when the mappers will stop (no more items to 
> process)
>          */
>         private boolean queueIsExhausted() {
>             /*
>              * This could be as simple as checking if the queue is empty,
>              * or this could block until all mappers have signalled* that
>              * they are finished AND the queue is empty...
>              */
>             throw new UnsupportedOperationException("Depends on your queueing 
> impl");
>         }
>
>         /** Fetches a value from the queue */
>         private String fetchValueFromQueue() {
>             throw new UnsupportedOperationException("Depends on your queueing 
> impl");
>         }
>     }
>
>     private static final class QueueInputSplit implements InputSplit {
>         private final int id;
>
>         private QueueInputSplit(int id) {
>             this.id = id;
>         }
>
>         public long getLength() throws IOException {
>             return 1;
>         }
>
>         public String[] getLocations() throws IOException {
>             return new String[]{"queueIdentifierGoesHere?"+id};
>         }
>
>         public void readFields(DataInput in) throws IOException {
>             throw new UnsupportedOperationException("Real implementation 
> needs to handle this");
>         }
>
>         public void write(DataOutput out) throws IOException {
>             throw new UnsupportedOperationException("Real implementation 
> needs to handle this");
>         }
>
>     }
> }
>
>
>
> -----Original Message-----
> From: Jim the Standing Bear [mailto:[EMAIL PROTECTED]
> Sent: Tuesday, October 30, 2007 8:04 AM
> To: [email protected]
> Cc: [EMAIL PROTECTED]
> Subject: Re: can jobs be launched recursively within a mapper ?
>
> thank you for all the help. I think I am beginning to gain a more
> clear picture of hadoop.  I will try the file solution.
>
> On 10/29/07, Aaron Kimball <[EMAIL PROTECTED]> wrote:
> > If you modify the JobConf for a running job within the context of a
> > mapper, the changes will not propagate back to the other machines.
> > JobConfs are serialized to XML and then distributed to the mapping nodes
> > where they are read back into the running Java tasks. There is no
> > "refresh" function that I am aware of.
> >
> > - Aaron
> >
> > Jim the Standing Bear wrote:
> > > Thanks, Stu...  Maybe my mind is way off track - but I still sense a
> > > problem with the mapper sending feedbacks to the job controller.  That
> > > is, when a mapper has reached the terminal condition, how can it tell
> > > the job controller to stop?
> > >
> > > If I keep a JobConf object in the mapper, and set a property
> > > "stop.processing" to true when a mapping task has reached the terminal
> > > condition, will it cause synchronization problems?  There could be
> > > other mapping tasks that still wish to go on?
> > >
> > > I tried to find a way so that the job controller can open the file in
> > > the output path at the end of the loop to read the contents; but thus
> > > far, I haven't seen a way to achieve this.
> > >
> > > Does this mean I have hit a dead-end?
> > >
> > > -- Jim
> > >
> > >
> > >
> > > On 10/29/07, Stu Hood <[EMAIL PROTECTED]> wrote:
> > >
> > >> The iteration would take place in your control code (your 'main' method, 
> > >> as shown in the examples).
> > >>
> > >> In order to prevent records from looping infinitely, each iteration 
> > >> would need to use a separate output/input directory.
> > >>
> > >> Thanks,
> > >> Stu
> > >>
> > >>
> > >> -----Original Message-----
> > >> From: Jim the Standing Bear <[EMAIL PROTECTED]>
> > >> Sent: Monday, October 29, 2007 5:45pm
> > >> To: [email protected]
> > >> Subject: Re: can jobs be launched recursively within a mapper ?
> > >>
> > >> thanks, Owen and David,
> > >>
> > >> I also thought of making a queue so that I can push catalog names to
> > >> the end of it, while the job control loop keeps removing items off the
> > >> queue until there is no more left.
> > >>
> > >> However, the problem is I don't see how I can do so within the
> > >> map/reduce context.  All the code examples are one-shot deals and
> > >> there is no iteration involved.
> > >>
> > >> Furthermore, what David said made sense, but to avoid infinite loop,
> > >> the code must remove the record it just read from the input file.  How
> > >> do I do that using hadoop's fs?  or does hadoop take care of it
> > >> automatically?
> > >>
> > >> -- Jim
> > >>
> > >>
> > >>
> > >> On 10/29/07, David Balatero <[EMAIL PROTECTED]> wrote:
> > >>
> > >>> Aren't these questions a little advanced for a bear to be asking?
> > >>> I'll be here all night...
> > >>>
> > >>> But seriously, if your job is inherently recursive, one possible way
> > >>> to do it would be to make sure that you output in the same format
> > >>> that you input. Then you can keep re-reading the outputted file back
> > >>> into a new map/reduce job, until you hit some base case and you
> > >>> terminate. I've had a main method before that would kick off a bunch
> > >>> of jobs in a row -- but I wouldn't really recommend starting another
> > >>> map/reduce job in the scope of a running map() or reduce() method.
> > >>>
> > >>> - David
> > >>>
> > >>>
> > >>> On Oct 29, 2007, at 2:17 PM, Jim the Standing Bear wrote:
> > >>>
> > >>>
> > >>>> then
> > >>>>
> > >>>
> > >> --
> > >> --------------------------------------
> > >> Standing Bear Has Spoken
> > >> --------------------------------------
> > >>
> > >>
> > >>
> > >>
> > >
> > >
> > >
> >
>
>
> --
> --------------------------------------
> Standing Bear Has Spoken
> --------------------------------------
>


-- 
--------------------------------------
Standing Bear Has Spoken
--------------------------------------

Reply via email to