[ 
https://issues.apache.org/jira/browse/HADOOP-5084?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=12665831#action_12665831
 ] 

Michael Fuchs commented on HADOOP-5084:
---------------------------------------

Hi,

I managed to track down the problem. It seems to be a combination of two things 
which cause this data loss:

    * I use a string like "hdfs://192.168.1.4:9000/ana" as 
"hadoop.hdfs.defaultfs" and I do a 
fs.setWorkingDirectory("hdfs://192.168.1.4:9000/ana");
    * The Path object I'm using for the output (TextOutputFormat.setOutputPath) 
is not created by myself, it is created by using a Path instance retured by the 
fs.listStatus[0].getPath().
      In detail, I'm doing a
{code}
      Path somePath = fs.listStatus(someOtherPath)[n].getPath()
      Path output = new Path(somePath, "subfolder");
      TextOutputFormat.setOutputPath(jobConf, output);
{code}
I created a small application which shows issue.
The application takes three arguments:
1) pass "1" tu use an implementation which fails or "0" to use an 
implementation which works
2) the "hadoop.hdfs.defaultfs" value
3) the setWorkingDirectory value.

I execute the class like " bin/hadoop jar /tmp/example.jar org.test.TestMR 1 
hdfs://BOCK:9000/test/ hdfs://BOCK:9000/test/" and it does not write the output 
but doees not fail!

{code}
package org.test;
import java.io.IOException;
import java.util.Iterator;

import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reducer;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.RunningJob;
import org.apache.hadoop.mapred.TextOutputFormat;
import org.apache.log4j.Logger;


public class TestMR implements Mapper<LongWritable, Text, Text, Text>, 
Reducer<Text, Text, Text, Text> {
        static Logger logger = Logger.getLogger(TestMR.class);
        
        public void map(LongWritable key, Text value, OutputCollector<Text, 
Text> out, Reporter reporter) throws IOException {
                logger.info("map: key=" + key + ", value=" + value);
                out.collect(new Text(key.toString()), value);
        }

        public void configure(JobConf jobConf) {
        }

        public void close() throws IOException {
        }       
        
        public void reduce(Text key, Iterator<Text> values, 
OutputCollector<Text, Text> out, Reporter reporter) throws IOException {
                while(values.hasNext()) {
                        Text value = values.next();
                        logger.info("reduce: key=" + key + ", value=" + value);
                        out.collect(new Text(key), new Text(value));
                }
        }

        public static void main(String[] args) throws IOException {
                JobConf jobConf = new JobConf();
                if(args.length < 3) {
                        logger.error("usage: 0|1 hadoop.hdfs.defaultfs 
hadoop.hdfs.workdir");
                        return;
                }
                logger.info("setting defaultfs and workdir");
                jobConf.set("hadoop.hdfs.defaultfs", args[1]);

                FileSystem fs = FileSystem.get(jobConf);
                fs.setWorkingDirectory(new Path(args[2]));
                fs.mkdirs(fs.getWorkingDirectory());

                jobConf.setJobName(TestMR.class.getName());
                jobConf.setOutputKeyClass(Text.class);
                jobConf.setOutputValueClass(Text.class);

                logger.info("in: we use the 'in' directory. We create it and 
put some dummy data in it.");
                fs.delete(new Path(fs.getWorkingDirectory(), "in"), true);
                fs.mkdirs(new Path(fs.getWorkingDirectory(), "in"));
                FSDataOutputStream dataOut = fs.create(new 
Path(fs.getWorkingDirectory(), "in/data"));
                dataOut.write("some data".getBytes());
                dataOut.sync();
                dataOut.flush();
                dataOut.close();
                
                FileInputFormat.setInputPaths(jobConf, new Path("in"));

                logger.info("out: we use a subdirectory of the 'out' directory 
as the output of your job. Therefor we create the out folder first.");
                fs.delete(new Path(fs.getWorkingDirectory(), "out"), true);
                fs.mkdirs(new Path(fs.getWorkingDirectory(), "out"));
                Path out = null;
                
                if(args.length > 0 && "1".equals(args[0])) {
                        logger.info("using broken folder");
                        out = getBrokenOutFolder();
                } else {
                        logger.info("using working folder");
                        out = getWorkingOutFolder(fs);
                }
                logger.info("setting output using path: " + out.toString());
                TextOutputFormat.setOutputPath(jobConf, new Path(out, 
"subfolder"));

                jobConf.setMapperClass(TestMR.class);
                jobConf.setReducerClass(TestMR.class);
                jobConf.setJarByClass(TestMR.class);
                
                JobClient jobClient = new JobClient(jobConf);
                RunningJob runningJob = jobClient.submitJob(jobConf);
                
                while(!runningJob.isComplete()) {
                        logger.info("still running...");
                        try {
                                Thread.sleep(1000);
                        } catch (InterruptedException e) {
                                e.printStackTrace();
                        }
                }
        }

        private static Path getWorkingOutFolder(FileSystem fs) throws 
IOException {
                
                Path root = new Path("/test");
                for(FileStatus fileStatus : fs.listStatus(root)) {
                        if("out".equals(fileStatus.getPath().getName())) { 
return fileStatus.getPath(); }
                }
                return null;
        }

        private static Path getBrokenOutFolder() throws IOException {
                return new Path("out");
        }
        
}
{code}
If I start the application like "bin/hadoop jar /tmp/example.jar 
org.test.TestMR 1 hdfs://BOCK:9000/test/ /test/" it works.


> Reduce output data is not written to disk
> -----------------------------------------
>
>                 Key: HADOOP-5084
>                 URL: https://issues.apache.org/jira/browse/HADOOP-5084
>             Project: Hadoop Core
>          Issue Type: Bug
>    Affects Versions: 0.18.2
>         Environment: Linux version 2.6.22-12-generic (bui...@vernadsky) (gcc 
> version 4.1.3 20070831 (prerelease) (Ubuntu 4.1.2-16ubuntu1)) #1 SMP Sun Sep 
> 23 18:11:30 GMT 2007 running Hadoop 18.2 on two nodes
>            Reporter: Michael Fuchs
>            Priority: Critical
>
> I run into a critical issue with Hadoop 18.2 on my Linux boxes:
> The jobs executes without any complains and they are listed in the
> succeeded list but there is no output data beside the "_logs" directory.
> The same code works with .17.2.1
>  
> Here are some sections of the logs:
> [logfile]
> had...@bock:~/logs$ tail hadoop-hadoop-jobtracker-bock.log
> 2008-12-23 13:30:56,707 INFO org.apache.hadoop.mapred.JobInProgress:
> Choosing a data-local task task_200812231229_0031_m_000001 for
> speculation
> 2008-12-23 13:30:56,707 INFO org.apache.hadoop.mapred.JobTracker: Adding
> task 'attempt_200812231229_0031_m_000001_1' to tip
> task_200812231229_0031_m_000001, for tracker
> 'tracker_bock:localhost/127.0.0.1:15260'
> 2008-12-23 13:31:01,065 INFO org.apache.hadoop.mapred.JobInProgress:
> Task 'attempt_200812231229_0031_m_000001_1' has completed
> task_200812231229_0031_m_000001 successfully.
> 2008-12-23 13:31:03,177 INFO org.apache.hadoop.mapred.TaskRunner: Saved
> output of task 'attempt_200812231229_0031_r_000000_0' to
> hdfs://BOCK:9000/ana/oiprocessed/2008/12/23/Sen1/92a74190-2038-4c79-82c4-2de6fdc615db
> [/logfile]
> But the folder contains only a "_logs" folder which has a history file
> which contains:
> [logfile]
> Job JOBID="job_200812231415_0001" FINISH_TIME="1230038377844"
> JOB_STATUS="SUCCESS" FINISHED_MAPS="2" FINISHED_REDUCES="1"
> FAILED_MAPS="0" FAILED_REDUCES="0" COUNTERS="Job Counters .Data-local
> map tasks:2,Job Counters .Launched reduce tasks:1,Job Counters .Launched
> map tasks:3,Map-Reduce Framework.Reduce input records:61,Map-Reduce
> Framework.Map output records:61,Map-Reduce Framework.Map output
> bytes:7194,Map-Reduce Framework.Combine output records:0,Map-Reduce
> Framework.Map input records:61,Map-Reduce Framework.Reduce input
> groups:12,Map-Reduce Framework.Combine input records:0,Map-Reduce
> Framework.Map input bytes:36396,Map-Reduce Framework.Reduce output
> records:12,File Systems.HDFS bytes written:1533,File Systems.Local bytes
> written:14858,File Systems.HDFS bytes read:38679,File Systems.Local
> bytes
> read:7388,com..ana.scheduling.HadoopTask$Counter.MAPPEED:61
> "
> [/logfile]
> So what I see is that the system runs successful and it even says it
> writes data! ("Map-Reduce Framework.Reduce output records:12,File 
> Systems.HDFS bytes written:1533")
> If I run the same code with .17.2.1 or in local mode with .18.2 it works
> and I get a part-0000 file with the expected data.
>  
> Please tell me if you need additional information.

-- 
This message is automatically generated by JIRA.
-
You can reply to this email to add a comment to the issue online.

Reply via email to