Also, this gives you a solution to your race condition (by using hadoop's mechanisms) and it also gives you much higher throughput/reliability/scalability than writing to NFS can possibly give you.
On 1/15/08 12:54 PM, "Miles Osborne" <[EMAIL PROTECTED]> wrote: > surely the clean way (in a streaming environment) would be to define a > representation of some kind which serialises the output. > > http://en.wikipedia.org/wiki/Serialization > > after your mappers and reducers have completed, you would then have some > code which deserialise (unpacked) the output as desired. this would easily > allow you to reconstruct the two files from a single (set) of file > fragments. > > this approach would entail defining the serialisation / deserialisation > process in a way which was distinct from the actual mappers / reducers and > then having a little compilation process take that definition and both > create the necessary serialisers / deserialisers and also serve as > documentation. > > it does have extra overhead, but in the long run it is worth it, since the > interfaces are actually documented. > > Miles > > On 15/01/2008, John Heidemann <[EMAIL PROTECTED]> wrote: >> >> On Tue, 15 Jan 2008 09:09:07 PST, Ted Dunning wrote: >>> >>> Regarding the race condition, hadoop builds task specific temporary >>> directories in the output directory, one per reduce task, that hold these >>> output files (as long as you don't use absolute path names). When the >>> process completes successfully, the output files from that temporary >>> directory are moved to the correct place and the temporary task-specific >>> directory is deleted. If the reduce task dies or is superceded by >> another >>> task, then the directory is simply deleted. The file is not kept in >> memory >>> pending write. >>> >>> I am curious about how to demarcate the image boundaries in your current >>> output. Hadoop streaming makes the strong presumption of line >> orientation. >>> If that isn't valid for your output, then you may have a program that is >>> only accidentally working by finding line boundaries in binary data. In >>> particular, you may someday have a situation where some of the data has >> one >>> kind of line boundary that is recognized, but on output the corresponding >>> boundary is generated in a different form. For instance, if your program >>> sees CR-LF, it might take the pair as a line boundary and emit just LF. >>> Even if this is not happening now, you may be in for some trouble >>> later. >> >> I think Yuri left out a bit about what we're doing. >> He wasn't clear about what files we're talking about writing. >> Let me try to clarify. >> >> As context, all this is in Hadoop streaming. >> >> Here's one way, the "side-effect way" (this is what we're doing now): >> >> In principle, we'd like to not ouptut ANYTHING to stdout from streaming. >> Instead, we create new files somewhere in the shared Unix filespace. >> Basically, these files are side-effects of the map/reduce computation. >> >> This approach is described in Dean & Ghemawat section 4.5 >> (Side-effects), with the caveat that the user must be responsible for >> making any side-effect atomic. >> >> Our problem is, I think, that duplicated reducers scheduled for >> straggler elimination can result in extra, partial side-effect files. >> We're trying to figure out how to clean them up properly. >> >> Currently it seems that prematurely terminated reducers (due to cancled >> straggler elimination jobs) are not told they are terminated. They just >> get a SIGPIPE because their write destination goes away. >> >> This prompted Yuri's first question: >> >>>>>> 1. Is there an easy way to tell in a script launched by the Hadoop >>>>>> streaming, if the script was terminated before it received >> complete >>>>>> input? >> >> To me, it seems that cancled jobs should get a SIGTERM or SIGUSR1 so >> they can catch and cleanup properly. Otherwise there seems to be no >> clean way to distinguish a half-run job from a fully run job that >> happens to have less input. (I.e., no way for our reducer to do a >> commit or abort properly.) >> >> (It would be nicer to send an in-band termination signal down stdin, but >> I don't think a streaming reducer can do that.) >> >> So what do the Hadoop architects think about side-effects and recovering >> from half-run jobs? Does hadoop intend to support side-effects (for >> interested users, obviously not as standard practice)? If we were in >> Java would we get a signal we could use to do cleanup? >> >> What do that Hadoop streaming people think? Is this just a bug that >> streaming is not propagating a signal that appears in Javaland? >> >> >> >> There's a second way, which is where most of the discussion has gone, >> call it the "proper" way: >> >> Rather than writing files as side-effects, the argument is to just >> output the data with the standard hadoop mechanism. In streaming, this >> means through stdout. >> >> Which prompted Yuri's second question: >>>>>> 2. Is there any good way to write multiple HDFS files from a >> streaming >>>>>> script >>>>>> *and have Hadoop cleanup those files* when it decides to destroy >>>>> the >>>>>> task? If there was just one file, I could simply use STDOUT, but >>>>>> dumping >>>>>> multiple binary files to STDOUT is not pretty. >> >> But I actually think this is not viable for us, >> because we're writing images which are binary. >> As per Doug's comment: >> >>> If that isn't valid for your output, then you may have a program that is >>> only accidentally working by finding line boundaries in binary data. >> >> (Doug, we're not doing it this way right now.) >> >> That said, if it worked, this way is clearly a lot cleaner, since Hadoop >> already handles commit/abort for half-run jobs. Basically all of our >> half-run problems go away. But they're replaced with File Formatt >> Problems. >> >> If we were in Java, we could write our own OutputRecord class. This is >> what >> Runping suggested and Yuri was discussing. I don't think that works for >> us (because we're not in Java, although I suppose it might be made to >> work). >> >> If we go that way, then we're basically packing many files into one. >> To me it seems to me cleanest, if one wants to do that, to use some >> existing format, like tar or zip or cpio, or maybe the hadoop multi-file >> format. But this way seems fraught with peril, since we have to fight >> streaming and custom record output, and then still extract the files >> after output completes anyway. Lots and lots of work---it feels like >> this can't be right. >> >> (Another one hacky way to make this work in streaming is to convert binary >> to >> ascii, like base-64-ize the files. Been there in SQL. Done that. >> Don't want to do it again. It still has all the encoding and >> post-processing junk. :-) >> >> >> >> Yuri had a very clever hack that merges the two schemes. He writes to >> random filenames as side-effects, but then writes the side-effect >> filenames as hadoop output. Therefore Hadoop handles commit/abort, and >> post run he just collects the files that appear in Hadoop's part-* >> output and discards the others. >> >> This hack works, but IMHO the reducer should do the commit/abort of >> side-effects, not some post-processing job. >> >> >> So any thoughts about supporting side-effects? >> >> >> -John >>