On Tue, 15 Jan 2008 09:09:07 PST, Ted Dunning wrote: 
>
>Regarding the race condition, hadoop builds task specific temporary
>directories in the output directory, one per reduce task, that hold these
>output files (as long as you don't use absolute path names).  When the
>process completes successfully, the output files from that temporary
>directory are moved to the correct place and the temporary task-specific
>directory is deleted.  If the reduce task dies or is superceded by another
>task, then the directory is simply deleted.  The file is not kept in memory
>pending write.
>
>I am curious about how to demarcate the image boundaries in your current
>output.  Hadoop streaming makes the strong presumption of line orientation.
>If that isn't valid for your output, then you may have a program that is
>only accidentally working by finding line boundaries in binary data.  In
>particular, you may someday have a situation where some of the data has one
>kind of line boundary that is recognized, but on output the corresponding
>boundary is generated in a different form.  For instance, if your program
>sees CR-LF, it might take the pair as a line boundary and emit just LF.
>Even if this is not happening now, you may be in for some trouble
>later.

I think Yuri left out a bit about what we're doing.
He wasn't clear about what files we're talking about writing.
Let me try to clarify.

As context, all this is in Hadoop streaming.

Here's one way, the "side-effect way" (this is what we're doing now):

In principle, we'd like to not ouptut ANYTHING to stdout from streaming.
Instead, we create new files somewhere in the shared Unix filespace.
Basically, these files are side-effects of the map/reduce computation.

This approach is described in Dean & Ghemawat section 4.5
(Side-effects), with the caveat that the user must be responsible for
making any side-effect atomic.

Our problem is, I think, that duplicated reducers scheduled for
straggler elimination can result in extra, partial side-effect files.
We're trying to figure out how to clean them up properly.

Currently it seems that prematurely terminated reducers (due to cancled
straggler elimination jobs) are not told they are terminated.  They just
get a SIGPIPE because their write destination goes away.

This prompted Yuri's first question:

>>>> 1. Is there an easy way to tell in a script launched by the Hadoop
>>>>    streaming, if the script was terminated before it received complete
>>>>    input?

To me, it seems that cancled jobs should get a SIGTERM or SIGUSR1 so
they can catch and cleanup properly.  Otherwise there seems to be no
clean way to distinguish a half-run job from a fully run job that
happens to have less input.  (I.e., no way for our reducer to do a
commit or abort properly.)

(It would be nicer to send an in-band termination signal down stdin, but
I don't think a streaming reducer can do that.)

So what do the Hadoop architects think about side-effects and recovering
from half-run jobs?  Does hadoop intend to support side-effects (for
interested users, obviously not as standard practice)?  If we were in
Java would we get a signal we could use to do cleanup?

What do that Hadoop streaming people think?  Is this just a bug that
streaming is not propagating a signal that appears in Javaland?



There's a second way, which is where most of the discussion has gone,
call it the "proper" way:

Rather than writing files as side-effects, the argument is to just
output the data with the standard hadoop mechanism.  In streaming, this
means through stdout.

Which prompted Yuri's second question:
>>>> 2. Is there any good way to write multiple HDFS files from a streaming
>>>> script
>>>>    *and have Hadoop cleanup those files* when it decides to destroy
>>> the
>>>>    task?  If there was just one file, I could simply use STDOUT, but
>>>> dumping
>>>>    multiple binary files to STDOUT is not pretty.

But I actually think this is not viable for us,
because we're writing images which are binary.
As per Doug's comment:

>If that isn't valid for your output, then you may have a program that is
>only accidentally working by finding line boundaries in binary data. 

(Doug, we're not doing it this way right now.)

That said, if it worked, this way is clearly a lot cleaner, since Hadoop
already handles commit/abort for half-run jobs.  Basically all of our
half-run problems go away.  But they're replaced with File Formatt
Problems.

If we were in Java, we could write our own OutputRecord class.  This is what
Runping suggested and Yuri was discussing.  I don't think that works for
us (because we're not in Java, although I suppose it might be made to
work).

If we go that way, then we're basically packing many files into one.
To me it seems to me cleanest, if one wants to do that, to use some
existing format, like tar or zip or cpio, or maybe the hadoop multi-file
format.  But this way seems fraught with peril, since we have to fight
streaming and custom record output, and then still extract the files
after output completes anyway.  Lots and lots of work---it feels like
this can't be right.

(Another one hacky way to make this work in streaming is to convert binary to
ascii, like base-64-ize the files.  Been there in SQL.  Done that.
Don't want to do it again.  It still has all the encoding and
post-processing junk. :-)


        
Yuri had a very clever hack that merges the two schemes.  He writes to
random filenames as side-effects, but then writes the side-effect
filenames as hadoop output.  Therefore Hadoop handles commit/abort, and
post run he just collects the files that appear in Hadoop's part-*
output and discards the others.

This hack works, but IMHO the reducer should do the commit/abort of
side-effects, not some post-processing job.


So any thoughts about supporting side-effects?


   -John

Reply via email to