I don't use mapreduce, and just practice using Hadoop common api to
manually split a data file, in which data is stored in a form of
SequceFileInputFormat.

The way to split file is by dividing file length by total tasks
number. InputSplit created will be passed RecordReader and read from
designated path. The code is as below:

   private void readPartOfDataFile() {
      taskId = getTaskId();
      InputSplit split = getSplit(taskid);
      SequenceFileRecordReader<Text, CustomData> input = new
SequenceFileRecordReader<Text, CustomData>(conf, (FileSplit) split);
      Text url = input.createKey();
      CustomData d = input.createValue();
      int count = 0;
      while(input.next(url, d)) {
        count++;
      }
    }

    private InputSplit getSplit(final int taskid) throws IOException {
      FileSystem fs = FileSystem.get(conf);
      Path filePath = new Path("path/to/", "file");
      FileStatus[] status = fs.listStatus(filePath);
      int maxTasks = conf.getInt("test.maxtasks", 12);
      for(FileStatus file: status) {
        if(file.isDir()) { // get data file
          Path dataFile = new Path(file.getPath(), "data");
          FileStatus data = fs.getFileStatus(dataFile);
          long dataLength = data.getLen();
          BlockLocation[] locations =
            fs.getFileBlockLocations(data, 0, dataLength);
          if(0 < dataLength) {
            long chunk = dataLength/(long)maxTasks;
            long beg = (taskid*chunk)+(long)1;
            long end = (taskid+1)*chunk;
            if(maxTasks == (taskid+1)) {
              end = dataLength;
            }
            return new FileSplit(dataFile, beg, end,
locations[locations.length-1].getHosts());
          } else {
            LOG.info("No Data for file:"+file.getPath());
          }
        }// is dir
      }// for
      return null;
    }

However, it seems that the records read from data file is not equally
distributed. For instance, data file may contain 1200 records and data
length is around 74250. With 12 max tasks, each task may roughly hold
size around 6187 (per split). But the records displayed shows that
each task may hold various  records (e.g. task 4 read records 526.
task 5 read 632. task 6 read 600) and the total count records is
larger than the total records stored. I check
JobClient.writeOldSplits(). It seems similar to the way to JobClient
divides data. What is missing when considering split data with hadoop
common api?

Reply via email to