[ 
https://issues.apache.org/jira/browse/HADOOP-11794?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15048044#comment-15048044
 ] 

Yongjun Zhang commented on HADOOP-11794:
----------------------------------------

BTW guys, I was thinking (some ideas are inspired by the patch attached to this 
jira):

1. when creating file listing which used to have one entry like <srcFile, 
CopyListingFileStatus> for file srcFile, we can create multiple entries for the 
same file, each entry representing a chunk of the file. And include <offset, 
chunkLength> as two new members of class CopyListingFileStatus.

2.  we can do some change to the following code in UniformSizeInputFormat.java 
(probably enabled with a command line switch). In the code below. Right now  we 
check whether including a file would exceed the bytesPerSplit, and decide 
whether to include the split. Instead of check the file length, we check the 
chunk length. If including the chunk make exceed the bytesPerSplit, then we 
don't include chunk in the split. Otherwise, we include.

We could probably introduce a new ChunkUniformSizeInputFormat class instead of 
modifying the current one.

{code}
  private List<InputSplit> getSplits(Configuration configuration, int numSplits,
                                     long totalSizeBytes) throws IOException {
    List<InputSplit> splits = new ArrayList<InputSplit>(numSplits);
    long nBytesPerSplit = (long) Math.ceil(totalSizeBytes * 1.0 / numSplits);

    CopyListingFileStatus srcFileStatus = new CopyListingFileStatus();
    Text srcRelPath = new Text();
    long currentSplitSize = 0;
    long lastSplitStart = 0;
    long lastPosition = 0;

    final Path listingFilePath = getListingFilePath(configuration);

    if (LOG.isDebugEnabled()) {
      LOG.debug("Average bytes per map: " + nBytesPerSplit +
          ", Number of maps: " + numSplits + ", total size: " + totalSizeBytes);
    }
    SequenceFile.Reader reader=null;
    try {
      reader = getListingFileReader(configuration);
      while (reader.next(srcRelPath, srcFileStatus)) {
        // If adding the current file would cause the bytes per map to exceed
        // limit. Add the current file to new split
        if (currentSplitSize + srcFileStatus.getLen() > nBytesPerSplit && 
lastPosition != 0) {
          FileSplit split = new FileSplit(listingFilePath, lastSplitStart,
              lastPosition - lastSplitStart, null);
          if (LOG.isDebugEnabled()) {
            LOG.debug ("Creating split : " + split + ", bytes in split: " + 
currentSplitSize);
          }
          splits.add(split);
          lastSplitStart = lastPosition;
          currentSplitSize = 0;
        }
        currentSplitSize += srcFileStatus.getLen();
        lastPosition = reader.getPosition();
      }
      if (lastPosition > lastSplitStart) {
        FileSplit split = new FileSplit(listingFilePath, lastSplitStart,
            lastPosition - lastSplitStart, null);
        if (LOG.isDebugEnabled()) {
          LOG.info ("Creating split : " + split + ", bytes in split: " + 
currentSplitSize);
        }
        splits.add(split);
      }

    } finally {
      IOUtils.closeStream(reader);
    }

    return splits;
  }
{code}

3. The CopyMapper is changed to copy each chunk of the big file, with the chunk 
offset included in the temporary target file name.

4. Change to CopyCommitter to load the copylisting file and iterate through, 
and stitch the segments of the same file to the target file. 
This work is not quite distributed. A more elegant solution would be to use 
reducers to do the stitching as Mithun suggested.

Wonder if this makes sense to you guys.

Thanks.

> distcp can copy blocks in parallel
> ----------------------------------
>
>                 Key: HADOOP-11794
>                 URL: https://issues.apache.org/jira/browse/HADOOP-11794
>             Project: Hadoop Common
>          Issue Type: Improvement
>          Components: tools/distcp
>    Affects Versions: 0.21.0
>            Reporter: dhruba borthakur
>            Assignee: Yongjun Zhang
>         Attachments: MAPREDUCE-2257.patch
>
>
> The minimum unit of work for a distcp task is a file. We have files that are 
> greater than 1 TB with a block size of  1 GB. If we use distcp to copy these 
> files, the tasks either take a long long long time or finally fails. A better 
> way for distcp would be to copy all the source blocks in parallel, and then 
> stich the blocks back to files at the destination via the HDFS Concat API 
> (HDFS-222)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to