[ 
https://issues.apache.org/jira/browse/CRUNCH-685?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16836420#comment-16836420
 ] 

Ben Roling edited comment on CRUNCH-685 at 5/9/19 2:16 PM:
-----------------------------------------------------------

This is something Nate and I discussed offline.  He hints at the solution we 
were thinking about in the Jira title.  Basically, update the 
Source.fileSystem() and Target.fileSystem() method implementations to only 
apply config properties with whitelisted property name patterns.  For example, 
"dfs.*" ** and "fs.*".  This way if the FileSystem object happens to have a 
Configuration with a bunch of other stuff on it, that other stuff doesn't get 
put into the formatBundle config.  As Nate suggested, a cached FileSystem 
object can easily be attached to a job configuration that has a whole bunch of 
other properties that were associated with a previously run job.

There is some risk that the property name pattern whitelist will be incorrect.  
It would be ideal to have a configuration mechanism to override that if 
necessary so as not to require a new Crunch version to get a different 
whitelist.


was (Author: ben.roling):
This is something Nate and I discussed offline.  He hints at the solution we 
were thinking about in the Jira title.  Basically, update the 
Source.fileSystem() and Target.fileSystem() method implementations to only 
apply config properties with whitelisted property name patterns.  For example, 
"dfs.*" and "fs.*".  This way if the FileSystem object happens to have a 
Configuration with a bunch of other stuff on it, that other stuff doesn't get 
put into the formatBundle config.  As Nate suggested, a cached FileSystem 
object can easily be attached to a job configuration that has a whole bunch of 
other properties that were associated with a previously run job.

There is some risk that the property name pattern whitelist will be incorrect.  
It would be ideal to have a configuration mechanism to override that if 
necessary so as not to require a new Crunch version to get a different 
whitelist.

> Limit Target#fileSystem(FileSystem) to only apply filesystem specific 
> configurations to the FormatBundle
> --------------------------------------------------------------------------------------------------------
>
>                 Key: CRUNCH-685
>                 URL: https://issues.apache.org/jira/browse/CRUNCH-685
>             Project: Crunch
>          Issue Type: Improvement
>          Components: Core
>            Reporter: Nathan Schile
>            Assignee: Josh Wills
>            Priority: Major
>
> I have an application that runs multiple Crunch pipelines. The first pipeline 
> (P1) reads data from HDFS and completes successfully. The second pipeline 
> (P2) writes data to the same HDFS that was used in the P1 pipeline. The 
> Target configuration for the P2 pipeline is configured by utilizing the 
> Target#fileSystem(FileSystem) method. The P2 pipeline fails when committing 
> the job [1]. It fails when attempting to read a temporary directory from the 
> P1 pipeline, which was already deleted when the P1 pipeline completed.
> The failure is occurring because the Hadoop Filesystem uses an internal cache 
> [2] to cache Filesystems. The first pipeline create a FileSystem object that 
> contains the configuration 
> "mapreduce.output.fileoutputformat.outputdir":"hdfs://my-cluster/tmp/crunch-897836570/p2/output".
>  When the P2 pipeline runs it invokes Target#fileSystem(FileSystem) which 
> uses the cached FileSystem from P1 pipeline. The 
> Target#fileSystem(FileSystem) method copies the configuration from the 
> filesystem to the FormatBundle, which causes the erroneous 
> "mapreduce.output.fileoutputformat.outputdir" to be set.
> [1]
> {noformat}
> java.io.FileNotFoundException: File 
> hdfs://my-cluster/tmp/crunch-897836570/p2/output/_temporary/1 does not exist.
>       at 
> org.apache.hadoop.hdfs.DistributedFileSystem.listStatusInternal(DistributedFileSystem.java:747)
>       at 
> org.apache.hadoop.hdfs.DistributedFileSystem.access$600(DistributedFileSystem.java:113)
>       at 
> org.apache.hadoop.hdfs.DistributedFileSystem$16.doCall(DistributedFileSystem.java:808)
>       at 
> org.apache.hadoop.hdfs.DistributedFileSystem$16.doCall(DistributedFileSystem.java:804)
>       at 
> org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
>       at 
> org.apache.hadoop.hdfs.DistributedFileSystem.listStatus(DistributedFileSystem.java:804)
>       at org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:1566)
>       at org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:1609)
>       at 
> org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.getAllCommittedTaskPaths(FileOutputCommitter.java:322)
>       at 
> org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.commitJobInternal(FileOutputCommitter.java:392)
>       at 
> org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.commitJob(FileOutputCommitter.java:365)
>       at 
> org.apache.crunch.io.CrunchOutputs$CompositeOutputCommitter.commitJob(CrunchOutputs.java:379)
>       at 
> org.apache.crunch.io.CrunchOutputs$CompositeOutputCommitter.commitJob(CrunchOutputs.java:379)
>       at 
> org.apache.hadoop.mapreduce.v2.app.commit.CommitterEventHandler$EventProcessor.handleJobCommit(CommitterEventHandler.java:285)
>       at 
> org.apache.hadoop.mapreduce.v2.app.commit.CommitterEventHandler$EventProcessor.run(CommitterEventHandler.java:237)
>       at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>       at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>       at java.lang.Thread.run(Thread.java:748)
> {noformat}
> [2] 
> http://johnjianfang.blogspot.com/2015/03/hadoop-filesystem-internal-cache.html



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to