[ https://issues.apache.org/jira/browse/CRUNCH-685?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16836646#comment-16836646 ]
Andrew Olson commented on CRUNCH-685: ------------------------------------- This appears similar to a problem we encountered with the DistCp job recently added (see CRUNCH-660). The solution we landed on was cloning the pipeline configuration and then unsetting a specific handful of properties that were interfering with the DistCp being able to successfully run: {noformat} Configuration distCpConf = new Configuration(conf); // Remove unnecessary and problematic properties from the DistCp configuration. This is necessary since // files referenced by these properties may have already been deleted when the DistCp is being started. distCpConf.unset("mapreduce.job.cache.files"); distCpConf.unset("mapreduce.job.classpath.files"); distCpConf.unset("tmpjars"); {noformat} > Limit Target#fileSystem(FileSystem) to only apply filesystem specific > configurations to the FormatBundle > -------------------------------------------------------------------------------------------------------- > > Key: CRUNCH-685 > URL: https://issues.apache.org/jira/browse/CRUNCH-685 > Project: Crunch > Issue Type: Improvement > Components: Core > Reporter: Nathan Schile > Assignee: Josh Wills > Priority: Major > > I have an application that runs multiple Crunch pipelines. The first pipeline > (P1) reads data from HDFS and completes successfully. The second pipeline > (P2) writes data to the same HDFS that was used in the P1 pipeline. The > Target configuration for the P2 pipeline is configured by utilizing the > Target#fileSystem(FileSystem) method. The P2 pipeline fails when committing > the job [1]. It fails when attempting to read a temporary directory from the > P1 pipeline, which was already deleted when the P1 pipeline completed. > The failure is occurring because the Hadoop Filesystem uses an internal cache > [2] to cache Filesystems. The first pipeline create a FileSystem object that > contains the configuration > "mapreduce.output.fileoutputformat.outputdir":"hdfs://my-cluster/tmp/crunch-897836570/p2/output". > When the P2 pipeline runs it invokes Target#fileSystem(FileSystem) which > uses the cached FileSystem from P1 pipeline. The > Target#fileSystem(FileSystem) method copies the configuration from the > filesystem to the FormatBundle, which causes the erroneous > "mapreduce.output.fileoutputformat.outputdir" to be set. > [1] > {noformat} > java.io.FileNotFoundException: File > hdfs://my-cluster/tmp/crunch-897836570/p2/output/_temporary/1 does not exist. > at > org.apache.hadoop.hdfs.DistributedFileSystem.listStatusInternal(DistributedFileSystem.java:747) > at > org.apache.hadoop.hdfs.DistributedFileSystem.access$600(DistributedFileSystem.java:113) > at > org.apache.hadoop.hdfs.DistributedFileSystem$16.doCall(DistributedFileSystem.java:808) > at > org.apache.hadoop.hdfs.DistributedFileSystem$16.doCall(DistributedFileSystem.java:804) > at > org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81) > at > org.apache.hadoop.hdfs.DistributedFileSystem.listStatus(DistributedFileSystem.java:804) > at org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:1566) > at org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:1609) > at > org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.getAllCommittedTaskPaths(FileOutputCommitter.java:322) > at > org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.commitJobInternal(FileOutputCommitter.java:392) > at > org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.commitJob(FileOutputCommitter.java:365) > at > org.apache.crunch.io.CrunchOutputs$CompositeOutputCommitter.commitJob(CrunchOutputs.java:379) > at > org.apache.crunch.io.CrunchOutputs$CompositeOutputCommitter.commitJob(CrunchOutputs.java:379) > at > org.apache.hadoop.mapreduce.v2.app.commit.CommitterEventHandler$EventProcessor.handleJobCommit(CommitterEventHandler.java:285) > at > org.apache.hadoop.mapreduce.v2.app.commit.CommitterEventHandler$EventProcessor.run(CommitterEventHandler.java:237) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > at java.lang.Thread.run(Thread.java:748) > {noformat} > [2] > http://johnjianfang.blogspot.com/2015/03/hadoop-filesystem-internal-cache.html -- This message was sent by Atlassian JIRA (v7.6.3#76005)