[
https://issues.apache.org/jira/browse/CRUNCH-515?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14963338#comment-14963338
]
Sean Owen commented on CRUNCH-515:
----------------------------------
I have a patch that easily implements {{finalize()}} (attached -- do not
merge!) but it either uncovers some latent bugs here and there or else this
actually isn't allowed given the semantics of {{done()}}.
Have a look at {{ToolRunnerIT.FakeTextTool}} for example, which calls
{{done()}} but then needs the temp files to exist to compute the tested result:
{code}
@Override
public int run(String[] strings) throws Exception {
String urlsFile = strings[0];
String outFile = strings[1];
Pipeline pipeline = new MRPipeline(ToolRunnerIT.class, getConf());
PCollection<String> urls = pipeline.read(
new TextFileTableSource<String, String>(urlsFile, tableOf(strings(),
strings()))).values();
pipeline.write(urls, new TextFileTarget(outFile));
pipeline.done();
PCollection<String> stringPCollection = pipeline.readTextFile(outFile);
assertThat(stringPCollection.length().getValue(), is(greaterThan(0L)));
return 0;
}
{code}
{code}
java.lang.NullPointerException: null
at org.apache.hadoop.fs.Path.<init>(Path.java:105)
at org.apache.hadoop.fs.Path.<init>(Path.java:94)
at
org.apache.crunch.impl.dist.DistributedPipeline.createTempPath(DistributedPipeline.java:395)
at
org.apache.crunch.impl.dist.DistributedPipeline.createIntermediateOutput(DistributedPipeline.java:390)
at
org.apache.crunch.impl.dist.DistributedPipeline.getMaterializeSourceTarget(DistributedPipeline.java:360)
at org.apache.crunch.impl.mr.MRPipeline.materialize(MRPipeline.java:171)
at
org.apache.crunch.impl.dist.collect.PCollectionImpl.materialize(PCollectionImpl.java:113)
at
org.apache.crunch.materialize.pobject.PObjectImpl.<init>(PObjectImpl.java:56)
at
org.apache.crunch.materialize.pobject.FirstElementPObject.<init>(FirstElementPObject.java:38)
at org.apache.crunch.lib.Aggregate.length(Aggregate.java:95)
at
org.apache.crunch.impl.dist.collect.PCollectionImpl.length(PCollectionImpl.java:374)
at
org.apache.crunch.io.ToolRunnerIT$FakeTextTool.run(ToolRunnerIT.java:151)
at
org.apache.crunch.io.ToolRunnerIT.textRunWithoutToolRunner(ToolRunnerIT.java:87)
{code}
What do you think?
> Decrease probability of collision on Crunch temp directories
> ------------------------------------------------------------
>
> Key: CRUNCH-515
> URL: https://issues.apache.org/jira/browse/CRUNCH-515
> Project: Crunch
> Issue Type: Improvement
> Components: Core
> Affects Versions: 0.8.4, 0.11.0
> Reporter: Ben Roling
> Assignee: Josh Wills
> Attachments: CRUNCH-515-1.patch
>
>
> I've heard reports of failures of Crunch pipelines at our organization due to
> collision on temp directories.
> Take the following stack trace from an old internal email thread I dug up as
> an example:
> {noformat}
> 2015-04-02 04:45:49,208 INFO
> org.apache.crunch.hadoop.mapreduce.lib.jobcontrol.CrunchControlledJob:
> org.apache.hadoop.mapred.FileAlreadyExistsException: Output directory
> /tmp/crunch-686245394/p2/output already exists
> at
> org.apache.hadoop.mapreduce.lib.output.FileOutputFormat.checkOutputSpecs(FileOutputFormat.java:132)
> at org.apache.hadoop.mapred.JobClient$2.run(JobClient.java:1013)
> at org.apache.hadoop.mapred.JobClient$2.run(JobClient.java:974)
> at java.security.AccessController.doPrivileged(Native Method)
> at javax.security.auth.Subject.doAs(Subject.java:394)
> at
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1438)
> at
> org.apache.hadoop.mapred.JobClient.submitJobInternal(JobClient.java:974)
> at org.apache.hadoop.mapreduce.Job.submit(Job.java:582)
> at
> org.apache.crunch.hadoop.mapreduce.lib.jobcontrol.CrunchControlledJob.submit(CrunchControlledJob.java:340)
> at
> org.apache.crunch.hadoop.mapreduce.lib.jobcontrol.CrunchJobControl.startReadyJobs(CrunchJobControl.java:277)
> at
> org.apache.crunch.hadoop.mapreduce.lib.jobcontrol.CrunchJobControl.pollJobStatusAndStartNewOnes(CrunchJobControl.java:316)
> at
> org.apache.crunch.impl.mr.exec.MRExecutor.monitorLoop(MRExecutor.java:113)
> at
> org.apache.crunch.impl.mr.exec.MRExecutor.access$000(MRExecutor.java:55)
> at org.apache.crunch.impl.mr.exec.MRExecutor$1.run(MRExecutor.java:84)
> at java.lang.Thread.run(Thread.java:682)
> {noformat}
> What we found in this case is the pre-existing directory was rather old. It
> hung around because we're doing a poor job of cleaning old garbage out of our
> HDFS /tmp directory. We intend to set up a job to delete stuff older than a
> couple of weeks or so out of /tmp but I think the chances of a collision will
> still be high enough that failures like this might still happen on occasion.
> The temp directory Crunch chooses is a random 31-bit value:
> https://github.com/apache/crunch/blob/apache-crunch-0.11.0/crunch-core/src/main/java/org/apache/crunch/impl/dist/DistributedPipeline.java#L326
> I say 31 bit value because it comes from a 32-bit random integer but only
> includes positive values, thereby excluding 1 bit.
> The following blog post shows some probabilities for 32-bit hash collisions,
> which are essentially the same problem:
> http://preshing.com/20110504/hash-collision-probabilities/
> Since we're dealing with 31 bits instead of 32 the probabilities will be
> higher than expressed there for 32 bits. Even with 32 bits the probability
> of collision is 1 in 100 with just 9292 values.
> I have not done any thorough investigation to understand why, but in our
> production environment we have a lot of Crunch jobs and we are leaving
> 200-300 stray Crunch temp directories per day. Depending on how aggressive
> we get with a scheduled job to clean old stuff out of temp we could still
> have a realistic chance of hitting a collision.
> My proposal is to change the random integer component of the temp path to a
> UUID or something similar to make it drastically more unlikely that a
> collision will ever occur regardless of whether or not "/tmp" is ever cleaned
> up.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)