Hi jorgen, thank you for taking the time showing me the pseudo-code for a queueInputFormat. I will see if I can implement more details and use it for my project.
-- Jim On 10/30/07, Johnson, Jorgen <[EMAIL PROTECTED]> wrote: > Hey all, > > Any reason why something like this wouldn't work? > > Create a QueueInputFormat, which provides a RecordReader implementation that > pops values off a globally accessible queue*. This would require filling the > queue with values prior to loading the map/red job. This would allow the > mappers to cram values back into the queue for further processing when > necessary. > > We could even avoid pre-loading the queue by combining this with some form of > hybrid InputFormat which first reads a normal input file, but after each > reader has finished reading its piece of the input file (and before it > receives the OK to quit), it would try to pull items off the queue for > further processing. > > At the bottom is a pseudo-implementation to help illustrate what I'm saying. > > -jorgenj > > *A simple db table or file store in hdfs might be ways to accomplish this > globally accessible queue, or something like SQS might work: > http://www.amazon.com/Simple-Queue-Service-home-page/b?ie=UTF8&node=13584001 > =) > > > /** InputFormat which allows map/red jobs to process values > * out of a globally accessible queue > * > * Note: The queue must be primed prior to kicking off any job which > * attempts to process values in the queue > */ > public class QueueInputFormat extends Object implements InputFormat { > > public RecordReader getRecordReader(InputSplit split, JobConf job, > Reporter reporter) throws IOException { > return new QueueRecordReader(); > } > > public InputSplit[] getSplits(JobConf job, int numSplits) throws > IOException { > int overrideNumSplits = job.getInt("input.queue.hardcoded.num.maps", > numSplits); > > InputSplit[] splits = new InputSplit[overrideNumSplits]; > for ( int i = 0; i < numSplits; i++ ) { > splits[i] = new QueueInputSplit(i); > } > > return splits; > } > > public void validateInput(JobConf job) throws IOException { > throw new UnsupportedOperationException("Implement your validation, > if needed"); > } > > private static final class QueueRecordReader implements RecordReader { > public void close() throws IOException { > // close the queue > } > > public WritableComparable createKey() { > //this might be the id of this object from the queue, or whatever > you want > return new LongWritable(); > } > > public Writable createValue() { > //replace this with a type representing what's in your queue > return new Text(); > } > > public long getPos() throws IOException { > return (queueIsExhausted()?1:0); > } > > public float getProgress() throws IOException { > //could change this to report progress based on size of queue... > return (queueIsExhausted()?1:0); > } > > public boolean next(Writable key, Writable value) throws IOException { > ((LongWritable)key).set(1); > ((Text)value).set(fetchValueFromQueue()); > > return true; > } > > /** Use this to determine when the queue is exhausted, which in > * turn determines when the mappers will stop (no more items to > process) > */ > private boolean queueIsExhausted() { > /* > * This could be as simple as checking if the queue is empty, > * or this could block until all mappers have signalled* that > * they are finished AND the queue is empty... > */ > throw new UnsupportedOperationException("Depends on your queueing > impl"); > } > > /** Fetches a value from the queue */ > private String fetchValueFromQueue() { > throw new UnsupportedOperationException("Depends on your queueing > impl"); > } > } > > private static final class QueueInputSplit implements InputSplit { > private final int id; > > private QueueInputSplit(int id) { > this.id = id; > } > > public long getLength() throws IOException { > return 1; > } > > public String[] getLocations() throws IOException { > return new String[]{"queueIdentifierGoesHere?"+id}; > } > > public void readFields(DataInput in) throws IOException { > throw new UnsupportedOperationException("Real implementation > needs to handle this"); > } > > public void write(DataOutput out) throws IOException { > throw new UnsupportedOperationException("Real implementation > needs to handle this"); > } > > } > } > > > > -----Original Message----- > From: Jim the Standing Bear [mailto:[EMAIL PROTECTED] > Sent: Tuesday, October 30, 2007 8:04 AM > To: [email protected] > Cc: [EMAIL PROTECTED] > Subject: Re: can jobs be launched recursively within a mapper ? > > thank you for all the help. I think I am beginning to gain a more > clear picture of hadoop. I will try the file solution. > > On 10/29/07, Aaron Kimball <[EMAIL PROTECTED]> wrote: > > If you modify the JobConf for a running job within the context of a > > mapper, the changes will not propagate back to the other machines. > > JobConfs are serialized to XML and then distributed to the mapping nodes > > where they are read back into the running Java tasks. There is no > > "refresh" function that I am aware of. > > > > - Aaron > > > > Jim the Standing Bear wrote: > > > Thanks, Stu... Maybe my mind is way off track - but I still sense a > > > problem with the mapper sending feedbacks to the job controller. That > > > is, when a mapper has reached the terminal condition, how can it tell > > > the job controller to stop? > > > > > > If I keep a JobConf object in the mapper, and set a property > > > "stop.processing" to true when a mapping task has reached the terminal > > > condition, will it cause synchronization problems? There could be > > > other mapping tasks that still wish to go on? > > > > > > I tried to find a way so that the job controller can open the file in > > > the output path at the end of the loop to read the contents; but thus > > > far, I haven't seen a way to achieve this. > > > > > > Does this mean I have hit a dead-end? > > > > > > -- Jim > > > > > > > > > > > > On 10/29/07, Stu Hood <[EMAIL PROTECTED]> wrote: > > > > > >> The iteration would take place in your control code (your 'main' method, > > >> as shown in the examples). > > >> > > >> In order to prevent records from looping infinitely, each iteration > > >> would need to use a separate output/input directory. > > >> > > >> Thanks, > > >> Stu > > >> > > >> > > >> -----Original Message----- > > >> From: Jim the Standing Bear <[EMAIL PROTECTED]> > > >> Sent: Monday, October 29, 2007 5:45pm > > >> To: [email protected] > > >> Subject: Re: can jobs be launched recursively within a mapper ? > > >> > > >> thanks, Owen and David, > > >> > > >> I also thought of making a queue so that I can push catalog names to > > >> the end of it, while the job control loop keeps removing items off the > > >> queue until there is no more left. > > >> > > >> However, the problem is I don't see how I can do so within the > > >> map/reduce context. All the code examples are one-shot deals and > > >> there is no iteration involved. > > >> > > >> Furthermore, what David said made sense, but to avoid infinite loop, > > >> the code must remove the record it just read from the input file. How > > >> do I do that using hadoop's fs? or does hadoop take care of it > > >> automatically? > > >> > > >> -- Jim > > >> > > >> > > >> > > >> On 10/29/07, David Balatero <[EMAIL PROTECTED]> wrote: > > >> > > >>> Aren't these questions a little advanced for a bear to be asking? > > >>> I'll be here all night... > > >>> > > >>> But seriously, if your job is inherently recursive, one possible way > > >>> to do it would be to make sure that you output in the same format > > >>> that you input. Then you can keep re-reading the outputted file back > > >>> into a new map/reduce job, until you hit some base case and you > > >>> terminate. I've had a main method before that would kick off a bunch > > >>> of jobs in a row -- but I wouldn't really recommend starting another > > >>> map/reduce job in the scope of a running map() or reduce() method. > > >>> > > >>> - David > > >>> > > >>> > > >>> On Oct 29, 2007, at 2:17 PM, Jim the Standing Bear wrote: > > >>> > > >>> > > >>>> then > > >>>> > > >>> > > >> -- > > >> -------------------------------------- > > >> Standing Bear Has Spoken > > >> -------------------------------------- > > >> > > >> > > >> > > >> > > > > > > > > > > > > > > -- > -------------------------------------- > Standing Bear Has Spoken > -------------------------------------- > -- -------------------------------------- Standing Bear Has Spoken --------------------------------------
