[ https://issues.apache.org/jira/browse/CRUNCH-515?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14963338#comment-14963338 ]
Sean Owen commented on CRUNCH-515: ---------------------------------- I have a patch that easily implements {{finalize()}} (attached -- do not merge!) but it either uncovers some latent bugs here and there or else this actually isn't allowed given the semantics of {{done()}}. Have a look at {{ToolRunnerIT.FakeTextTool}} for example, which calls {{done()}} but then needs the temp files to exist to compute the tested result: {code} @Override public int run(String[] strings) throws Exception { String urlsFile = strings[0]; String outFile = strings[1]; Pipeline pipeline = new MRPipeline(ToolRunnerIT.class, getConf()); PCollection<String> urls = pipeline.read( new TextFileTableSource<String, String>(urlsFile, tableOf(strings(), strings()))).values(); pipeline.write(urls, new TextFileTarget(outFile)); pipeline.done(); PCollection<String> stringPCollection = pipeline.readTextFile(outFile); assertThat(stringPCollection.length().getValue(), is(greaterThan(0L))); return 0; } {code} {code} java.lang.NullPointerException: null at org.apache.hadoop.fs.Path.<init>(Path.java:105) at org.apache.hadoop.fs.Path.<init>(Path.java:94) at org.apache.crunch.impl.dist.DistributedPipeline.createTempPath(DistributedPipeline.java:395) at org.apache.crunch.impl.dist.DistributedPipeline.createIntermediateOutput(DistributedPipeline.java:390) at org.apache.crunch.impl.dist.DistributedPipeline.getMaterializeSourceTarget(DistributedPipeline.java:360) at org.apache.crunch.impl.mr.MRPipeline.materialize(MRPipeline.java:171) at org.apache.crunch.impl.dist.collect.PCollectionImpl.materialize(PCollectionImpl.java:113) at org.apache.crunch.materialize.pobject.PObjectImpl.<init>(PObjectImpl.java:56) at org.apache.crunch.materialize.pobject.FirstElementPObject.<init>(FirstElementPObject.java:38) at org.apache.crunch.lib.Aggregate.length(Aggregate.java:95) at org.apache.crunch.impl.dist.collect.PCollectionImpl.length(PCollectionImpl.java:374) at org.apache.crunch.io.ToolRunnerIT$FakeTextTool.run(ToolRunnerIT.java:151) at org.apache.crunch.io.ToolRunnerIT.textRunWithoutToolRunner(ToolRunnerIT.java:87) {code} What do you think? > Decrease probability of collision on Crunch temp directories > ------------------------------------------------------------ > > Key: CRUNCH-515 > URL: https://issues.apache.org/jira/browse/CRUNCH-515 > Project: Crunch > Issue Type: Improvement > Components: Core > Affects Versions: 0.8.4, 0.11.0 > Reporter: Ben Roling > Assignee: Josh Wills > Attachments: CRUNCH-515-1.patch > > > I've heard reports of failures of Crunch pipelines at our organization due to > collision on temp directories. > Take the following stack trace from an old internal email thread I dug up as > an example: > {noformat} > 2015-04-02 04:45:49,208 INFO > org.apache.crunch.hadoop.mapreduce.lib.jobcontrol.CrunchControlledJob: > org.apache.hadoop.mapred.FileAlreadyExistsException: Output directory > /tmp/crunch-686245394/p2/output already exists > at > org.apache.hadoop.mapreduce.lib.output.FileOutputFormat.checkOutputSpecs(FileOutputFormat.java:132) > at org.apache.hadoop.mapred.JobClient$2.run(JobClient.java:1013) > at org.apache.hadoop.mapred.JobClient$2.run(JobClient.java:974) > at java.security.AccessController.doPrivileged(Native Method) > at javax.security.auth.Subject.doAs(Subject.java:394) > at > org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1438) > at > org.apache.hadoop.mapred.JobClient.submitJobInternal(JobClient.java:974) > at org.apache.hadoop.mapreduce.Job.submit(Job.java:582) > at > org.apache.crunch.hadoop.mapreduce.lib.jobcontrol.CrunchControlledJob.submit(CrunchControlledJob.java:340) > at > org.apache.crunch.hadoop.mapreduce.lib.jobcontrol.CrunchJobControl.startReadyJobs(CrunchJobControl.java:277) > at > org.apache.crunch.hadoop.mapreduce.lib.jobcontrol.CrunchJobControl.pollJobStatusAndStartNewOnes(CrunchJobControl.java:316) > at > org.apache.crunch.impl.mr.exec.MRExecutor.monitorLoop(MRExecutor.java:113) > at > org.apache.crunch.impl.mr.exec.MRExecutor.access$000(MRExecutor.java:55) > at org.apache.crunch.impl.mr.exec.MRExecutor$1.run(MRExecutor.java:84) > at java.lang.Thread.run(Thread.java:682) > {noformat} > What we found in this case is the pre-existing directory was rather old. It > hung around because we're doing a poor job of cleaning old garbage out of our > HDFS /tmp directory. We intend to set up a job to delete stuff older than a > couple of weeks or so out of /tmp but I think the chances of a collision will > still be high enough that failures like this might still happen on occasion. > The temp directory Crunch chooses is a random 31-bit value: > https://github.com/apache/crunch/blob/apache-crunch-0.11.0/crunch-core/src/main/java/org/apache/crunch/impl/dist/DistributedPipeline.java#L326 > I say 31 bit value because it comes from a 32-bit random integer but only > includes positive values, thereby excluding 1 bit. > The following blog post shows some probabilities for 32-bit hash collisions, > which are essentially the same problem: > http://preshing.com/20110504/hash-collision-probabilities/ > Since we're dealing with 31 bits instead of 32 the probabilities will be > higher than expressed there for 32 bits. Even with 32 bits the probability > of collision is 1 in 100 with just 9292 values. > I have not done any thorough investigation to understand why, but in our > production environment we have a lot of Crunch jobs and we are leaving > 200-300 stray Crunch temp directories per day. Depending on how aggressive > we get with a scheduled job to clean old stuff out of temp we could still > have a realistic chance of hitting a collision. > My proposal is to change the random integer component of the temp path to a > UUID or something similar to make it drastically more unlikely that a > collision will ever occur regardless of whether or not "/tmp" is ever cleaned > up. -- This message was sent by Atlassian JIRA (v6.3.4#6332)