On Mon, Dec 03, 2007 at 09:38:57AM -0800, Owen O'Malley wrote:
> 
> On Dec 3, 2007, at 8:59 AM, Eugeny N Dzhurinsky wrote:
> 
>> Hello there!
>> 
>> We would like to start several map/reduce jobs on the same host from 
>> several
>> threads using different input and output dirs. However we are not able to 
>> do
>> this for some reason, and we are getting the exception:
>> 
>> 07/12/03 18:53:57 ERROR impl.OurTask: java.io.IOException: Target 
>> /tmp/hadoop-bofh/mapred/local/localRunner/job_local_1.xml already exists
> The problem is that you are using the local runner. For multiple threaded 
> access, the distributed mode is a better direction. (ie. Bring up a job 
> tracker, some set of task trackers, and submit your job to the job 
> tracker.) If you want to use local runner, you need to set mapred.local.dir 
> differently on each process.)  A fix for HADOOP-1733 would also fix this 
> problem.

I am using the code listed below to submit the job. I made mapred.local.dir to
point to different locations for different jobs (UUID is unique)
    
    /**
     * @see java.lang.Runnable#run()
     */
    public void run() {
        try {
            if (observer != null)
                observer.notifyTaskStarted(site);
            JobConf conf = new JobConf();
            conf.set("mapred.local.dir", "/tmp/hadoop-" + site.getUuid());
            conf.setJobName(site.getUuid());
            conf.setOutputKeyClass(Text.class);
            conf.setOutputValueClass(IntWritable.class);

            conf.setMapperClass(CrawlerMapAdapter.class);
            conf.setCombinerClass(CrawlerReduceAdapter.class);
            conf.setReducerClass(CrawlerReduceAdapter.class);

            conf.setInputFormat(TextInputFormat.class);
            conf.setOutputFormat(TextOutputFormat.class);

            final FileSystem fileSystem = FileSystem.get(conf);
            final Path inputPath = new Path(input, site.getUuid());
            final Path outputPath = new Path(output, site.getUuid());
            if (log.isInfoEnabled()) {
                log.info(site.getUrl() + " : Using input " + inputPath);
                log.info(site.getUrl() + " : Using output " + outputPath);
            }
            if (fileSystem.exists(inputPath))
                fileSystem.delete(inputPath);
            if (fileSystem.exists(outputPath))
                fileSystem.delete(outputPath);
            writeDownWebsiteInput(fileSystem, inputPath);
            conf.setInputPath(inputPath);
            conf.setOutputPath(outputPath);

            RunningJob running = JobClient.runJob(conf);
            running.waitForCompletion();
            fileSystem.delete(inputPath);
        } catch (Exception e) {
            log.error(e, e);
            throw new RuntimeException(e);
        } finally {
            if (observer != null)
                observer.notifyTaskFinished(site);
        }
    }

When submitting the job with this code I can see in logs the input path and the 
output path are different for different tasks.

When CrawlerMapAdapter class is invoked by Hadoop to do it's job, I dumped 
input 
and output paths with following code

            final Path[] paths = conf.getInputPaths();
            if (paths != null && paths.length > 0) {
                log.info("Using input ");
                for (Path path : paths)
                    log.info("Input path " + path);
            }
            log.info("Using output " + conf.getOutputPath());

And I found input and output paths are the SAME for different threads, which is 
weird, 
and it looks like all same configuration was passed to all threads, how can it 
be? Probably 
I misused JobClient?

-- 
Eugene N Dzhurinsky

Attachment: pgpZkeJ2vfQY1.pgp
Description: PGP signature

Reply via email to