We've specifically disabled speculative execution for this job, so this shouldn't happen, but of course in the general case the implementation of SequenceFilePerKeyOutput could append a UUID or similarly arbitrary distiguishing string to the temporary output to avoid any collisions (would be good practice anyway).
On 25 November 2015 at 14:44, Micah Whitacre <[email protected]> wrote: > So with the exception of sequence files and protobufs I think what you are > trying to achieve is very similar to what Kite does with Partitioned > Datasets[1]. Once again gets you back to Avro though. It's API is a > little simpler but maybe something to emulate if we wanted to build in > similar functionality in the Crunch or for a sequence file per key option. > > For when you are writing the output, because you are using time does that > then guarantee the uniqueness of the output? Just wondering because with > the rename you might have odd behavior if there are collisions. > > [1] - http://kitesdk.org/docs/1.1.0/Partitioned-Datasets.html > > On Wed, Nov 25, 2015 at 6:02 AM, David Whiting <[email protected]> > wrote: > > > I have the requirement at the moment to write out event data split into a > > directory tree by the category and time of the event (something I think > is > > a fairly common use case). There is already support for this in Crunch > the > > Avro world with the AvroFilePerKeyTarget, but we're using protobufs in > > sequence files round here. After investigating the built-in Hadoop > > MultipleOutputs mechanism, I decided it was a) very non-intuitive to use > b) > > difficult to ensure atomicity and c) difficult to integrate with Crunch. > > What I did instead was to open the files and write them from within the > > reduce tasks themselves and output from the reduce tasks simply a > > Pair<String, String> of temporary filename and final filename. That way, > if > > any part fails you won't have any output. After it completes, you simply > > materialize the output and perform a series of HDFS move operations to > move > > the files into their final location. If something fails, you can delete > all > > the temporary files and start again. > > > > Usage looks a bit like this: > > > > private Map<String, String> splitEventsToTemporaryDirectory( > > Path temporaryDirectory, > > PTable<String, Pair<Long, ByteBuffer>> events) { > > > > // Final output files will be written to > > hdfs://{keyValue}/{leafFilename} > > String leafFilename = "run_" + > > DateTime.now(DateTimeZone.UTC).toString("yyyy-MM-dd_HH-mm-ss"); > > return events > > .groupByKey() > > .parallelDo( > > new SequenceFilePerKeyOutputFn<>( > > temporaryDirectory.toString(), > > leafFilename, > > longs(), bytes()), > > tableOf(strings(), strings())) > > .materializeToMap(); > > } > > > > > > private void moveFilesToFinalLocation(Map<String, String> > > temporaryFinalPaths) throws IOException { > > for (Map.Entry<String, String> temporaryFinalPath : > > temporaryFinalPaths.entrySet()) { > > Path temporaryPath = new Path(temporaryFinalPath.getKey()); > > Path finalPath = new Path(temporaryFinalPath.getValue()); > > info("Renaming " + temporaryPath + " to " + finalPath); > > fs.mkdirs(finalPath.getParent()); > > fs.rename(temporaryPath, finalPath); > > } > > } > > > > This is all working nicely in production at the moment, but I have 2 > > follow-up questions for the Crunch community: > > * Is this a sane thing to do? Will it cause any problems that I haven't > > thought about. > > * Would this be a useful thing to contribute to Crunch itself? If so, > what > > would the API look like? > > >
