Also, this gives you a solution to your race condition (by using hadoop's
mechanisms) and it also gives you much higher
throughput/reliability/scalability than writing to NFS can possibly give
you.


On 1/15/08 12:54 PM, "Miles Osborne" <[EMAIL PROTECTED]> wrote:

> surely the clean way (in a streaming environment) would be to define a
> representation of some kind which serialises the output.
> 
> http://en.wikipedia.org/wiki/Serialization
> 
> after your mappers and reducers have completed, you would then have some
> code which deserialise (unpacked) the output as desired.   this would easily
> allow you to reconstruct the  two files from a single (set) of file
> fragments.
> 
> this approach would entail defining the serialisation / deserialisation
> process in a way which was distinct from the actual mappers / reducers and
> then having a little compilation process take that  definition and both
> create the necessary serialisers / deserialisers and also serve as
> documentation.
> 
> it does have extra overhead, but in the long run it is worth it, since the
> interfaces are actually documented.
> 
> Miles
> 
> On 15/01/2008, John Heidemann <[EMAIL PROTECTED]> wrote:
>> 
>> On Tue, 15 Jan 2008 09:09:07 PST, Ted Dunning wrote:
>>> 
>>> Regarding the race condition, hadoop builds task specific temporary
>>> directories in the output directory, one per reduce task, that hold these
>>> output files (as long as you don't use absolute path names).  When the
>>> process completes successfully, the output files from that temporary
>>> directory are moved to the correct place and the temporary task-specific
>>> directory is deleted.  If the reduce task dies or is superceded by
>> another
>>> task, then the directory is simply deleted.  The file is not kept in
>> memory
>>> pending write.
>>> 
>>> I am curious about how to demarcate the image boundaries in your current
>>> output.  Hadoop streaming makes the strong presumption of line
>> orientation.
>>> If that isn't valid for your output, then you may have a program that is
>>> only accidentally working by finding line boundaries in binary data.  In
>>> particular, you may someday have a situation where some of the data has
>> one
>>> kind of line boundary that is recognized, but on output the corresponding
>>> boundary is generated in a different form.  For instance, if your program
>>> sees CR-LF, it might take the pair as a line boundary and emit just LF.
>>> Even if this is not happening now, you may be in for some trouble
>>> later.
>> 
>> I think Yuri left out a bit about what we're doing.
>> He wasn't clear about what files we're talking about writing.
>> Let me try to clarify.
>> 
>> As context, all this is in Hadoop streaming.
>> 
>> Here's one way, the "side-effect way" (this is what we're doing now):
>> 
>> In principle, we'd like to not ouptut ANYTHING to stdout from streaming.
>> Instead, we create new files somewhere in the shared Unix filespace.
>> Basically, these files are side-effects of the map/reduce computation.
>> 
>> This approach is described in Dean & Ghemawat section 4.5
>> (Side-effects), with the caveat that the user must be responsible for
>> making any side-effect atomic.
>> 
>> Our problem is, I think, that duplicated reducers scheduled for
>> straggler elimination can result in extra, partial side-effect files.
>> We're trying to figure out how to clean them up properly.
>> 
>> Currently it seems that prematurely terminated reducers (due to cancled
>> straggler elimination jobs) are not told they are terminated.  They just
>> get a SIGPIPE because their write destination goes away.
>> 
>> This prompted Yuri's first question:
>> 
>>>>>> 1. Is there an easy way to tell in a script launched by the Hadoop
>>>>>>    streaming, if the script was terminated before it received
>> complete
>>>>>>    input?
>> 
>> To me, it seems that cancled jobs should get a SIGTERM or SIGUSR1 so
>> they can catch and cleanup properly.  Otherwise there seems to be no
>> clean way to distinguish a half-run job from a fully run job that
>> happens to have less input.  (I.e., no way for our reducer to do a
>> commit or abort properly.)
>> 
>> (It would be nicer to send an in-band termination signal down stdin, but
>> I don't think a streaming reducer can do that.)
>> 
>> So what do the Hadoop architects think about side-effects and recovering
>> from half-run jobs?  Does hadoop intend to support side-effects (for
>> interested users, obviously not as standard practice)?  If we were in
>> Java would we get a signal we could use to do cleanup?
>> 
>> What do that Hadoop streaming people think?  Is this just a bug that
>> streaming is not propagating a signal that appears in Javaland?
>> 
>> 
>> 
>> There's a second way, which is where most of the discussion has gone,
>> call it the "proper" way:
>> 
>> Rather than writing files as side-effects, the argument is to just
>> output the data with the standard hadoop mechanism.  In streaming, this
>> means through stdout.
>> 
>> Which prompted Yuri's second question:
>>>>>> 2. Is there any good way to write multiple HDFS files from a
>> streaming
>>>>>> script
>>>>>>    *and have Hadoop cleanup those files* when it decides to destroy
>>>>> the
>>>>>>    task?  If there was just one file, I could simply use STDOUT, but
>>>>>> dumping
>>>>>>    multiple binary files to STDOUT is not pretty.
>> 
>> But I actually think this is not viable for us,
>> because we're writing images which are binary.
>> As per Doug's comment:
>> 
>>> If that isn't valid for your output, then you may have a program that is
>>> only accidentally working by finding line boundaries in binary data.
>> 
>> (Doug, we're not doing it this way right now.)
>> 
>> That said, if it worked, this way is clearly a lot cleaner, since Hadoop
>> already handles commit/abort for half-run jobs.  Basically all of our
>> half-run problems go away.  But they're replaced with File Formatt
>> Problems.
>> 
>> If we were in Java, we could write our own OutputRecord class.  This is
>> what
>> Runping suggested and Yuri was discussing.  I don't think that works for
>> us (because we're not in Java, although I suppose it might be made to
>> work).
>> 
>> If we go that way, then we're basically packing many files into one.
>> To me it seems to me cleanest, if one wants to do that, to use some
>> existing format, like tar or zip or cpio, or maybe the hadoop multi-file
>> format.  But this way seems fraught with peril, since we have to fight
>> streaming and custom record output, and then still extract the files
>> after output completes anyway.  Lots and lots of work---it feels like
>> this can't be right.
>> 
>> (Another one hacky way to make this work in streaming is to convert binary
>> to
>> ascii, like base-64-ize the files.  Been there in SQL.  Done that.
>> Don't want to do it again.  It still has all the encoding and
>> post-processing junk. :-)
>> 
>> 
>> 
>> Yuri had a very clever hack that merges the two schemes.  He writes to
>> random filenames as side-effects, but then writes the side-effect
>> filenames as hadoop output.  Therefore Hadoop handles commit/abort, and
>> post run he just collects the files that appear in Hadoop's part-*
>> output and discards the others.
>> 
>> This hack works, but IMHO the reducer should do the commit/abort of
>> side-effects, not some post-processing job.
>> 
>> 
>> So any thoughts about supporting side-effects?
>> 
>> 
>>    -John
>> 

Reply via email to