steveloughran commented on code in PR #6058:
URL: https://github.com/apache/hadoop/pull/6058#discussion_r1324210411


##########
hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/CopyFilter.java:
##########
@@ -46,6 +46,14 @@ public void initialize() {}
    */
   public abstract boolean shouldCopy(Path path);
 
+  public boolean shouldCopy(CopyListingFileStatus fileStatus){
+    return shouldCopy(fileStatus.getPath());
+  }
+
+  public boolean supportFileStatus(){

Review Comment:
   what's this for? add javadoc to explain.



##########
hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DirCopyFilter.java:
##########
@@ -0,0 +1,54 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.tools;
+
+import java.io.IOException;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class DirCopyFilter extends FileStatusCopyFilter {

Review Comment:
   needs javadocs, and know that our indentation policy is "two spaces"



##########
hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/CopyFilter.java:
##########
@@ -46,6 +46,14 @@ public void initialize() {}
    */
   public abstract boolean shouldCopy(Path path);
 
+  public boolean shouldCopy(CopyListingFileStatus fileStatus){

Review Comment:
   needs javadoc



##########
hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/UniformRecordInputFormat.java:
##########
@@ -0,0 +1,139 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.tools.mapred;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.InputFormat;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.lib.input.FileSplit;
+import org.apache.hadoop.mapreduce.lib.input.SequenceFileRecordReader;
+import org.apache.hadoop.tools.CopyListingFileStatus;
+import org.apache.hadoop.tools.util.DistCpUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+public class UniformRecordInputFormat extends InputFormat<Text, 
CopyListingFileStatus> {
+    private static final Logger LOG = 
LoggerFactory.getLogger(UniformRecordInputFormat.class);
+
+    public List<InputSplit> getSplits(JobContext context) throws IOException, 
InterruptedException {
+        Configuration conf = context.getConfiguration();
+        int numSplits = getNumSplits(conf);
+        if (numSplits == 0) return new ArrayList();
+
+        return createSplits(conf, numSplits, getNumberOfRecords(conf));
+    }
+
+    private List<InputSplit> createSplits(Configuration configuration, int 
numSplits, long numRecords) throws IOException {
+        List<InputSplit> splits = new ArrayList(numSplits);
+        long nRecordsPerSplit = (long) Math.floor(numRecords * 1.0 / 
numSplits);
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("Average records per map: " + nRecordsPerSplit +
+                    ", Number of maps: " + numSplits + ", total records: " + 
numRecords);
+        }
+
+        Path listingFilePath = getListingFilePath(configuration);
+        CopyListingFileStatus srcFileStatus = new CopyListingFileStatus();
+        Text srcRelPath = new Text();
+        long lastPosition = 0L;
+        long count = 0L;
+        long remains = numRecords - nRecordsPerSplit * (long) numSplits;
+
+        SequenceFile.Reader reader = null;
+        try {
+            reader = getListingFileReader(configuration);
+            while (reader.next(srcRelPath, srcFileStatus)) {
+                count++;
+
+                if((remains > 0 && count % (nRecordsPerSplit + 1) == 0) ||
+                        (remains == 0 && count % nRecordsPerSplit == 0)){
+
+                    long currentPosition = reader.getPosition();
+                    FileSplit split = new FileSplit(listingFilePath, 
lastPosition,
+                            currentPosition - lastPosition, null);
+                    if(LOG.isDebugEnabled()){
+                        LOG.debug("Creating split: " + split + ", records in 
split: " + count);
+                    }
+
+                    splits.add(split);
+                    lastPosition = currentPosition;
+                    if(remains > 0){
+                        remains--;
+                    }
+                    count = 0L;
+                }
+            }
+
+            return splits;
+        } finally {
+            IOUtils.closeStream(reader);
+        }
+    }
+
+    public RecordReader<Text, CopyListingFileStatus> createRecordReader(
+            InputSplit split, TaskAttemptContext context)
+            throws IOException, InterruptedException {
+        return new SequenceFileRecordReader<Text, CopyListingFileStatus>();
+    }
+
+    private static Path getListingFilePath(Configuration configuration) {
+        String listingFilePathString =
+                configuration.get("distcp.listing.file.path", "");
+
+        assert !listingFilePathString.equals("")
+                : "Couldn't find listing file. Invalid input.";
+        return new Path(listingFilePathString);
+    }
+
+    private SequenceFile.Reader getListingFileReader(Configuration conf) {
+        Path listingFilePath = getListingFilePath(conf);
+
+        try {
+            FileSystem fs = listingFilePath.getFileSystem(conf);
+            if (!fs.exists(listingFilePath)) {
+                throw new IllegalArgumentException("Listing file doesn't exist 
at: "
+                        + listingFilePath);
+            }
+            return new SequenceFile.Reader(conf,
+                    SequenceFile.Reader.file(listingFilePath));
+        } catch (IOException exception) {
+            LOG.error("Couldn't find listing file at: " + listingFilePath, 
exception);
+            throw new IllegalArgumentException("Couldn't find listing-file at: 
"

Review Comment:
   make this an `IllegalStateException` for consistency with the checkState.
   
   we will need tests to simulate these failures



##########
hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/mapred/TestUniformRecordInputFormat.java:
##########
@@ -0,0 +1,191 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.tools.mapred;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.JobID;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.lib.input.FileSplit;
+import org.apache.hadoop.mapreduce.task.JobContextImpl;
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.tools.CopyListing;
+import org.apache.hadoop.tools.CopyListingFileStatus;
+import org.apache.hadoop.tools.DistCpContext;
+import org.apache.hadoop.tools.DistCpOptions;
+import org.apache.hadoop.tools.StubContext;
+import org.apache.hadoop.tools.util.DistCpUtils;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
+
+
+public class TestUniformRecordInputFormat {
+  private static MiniDFSCluster cluster;
+  private static final int N_FILES = 20;
+  private static final int SIZEOF_EACH_FILE = 1024;
+  private static final Random random = new Random();
+  private static int totalFileSize = 0;
+
+  private static final Credentials CREDENTIALS = new Credentials();
+
+
+  @BeforeClass
+  public static void setup() throws Exception {
+    cluster = new MiniDFSCluster.Builder(new Configuration()).numDataNodes(1)
+                                          .format(true).build();
+    totalFileSize = 0;
+
+    for (int i=0; i<N_FILES; ++i)
+      totalFileSize += createFile("/tmp/source/" + String.valueOf(i), 
SIZEOF_EACH_FILE);
+  }
+
+  private static DistCpOptions getOptions(int nMaps) throws Exception {
+    Path sourcePath = new Path(cluster.getFileSystem().getUri().toString()
+                               + "/tmp/source");
+    Path targetPath = new Path(cluster.getFileSystem().getUri().toString()
+                               + "/tmp/target");
+
+    List<Path> sourceList = new ArrayList<Path>();
+    sourceList.add(sourcePath);
+    return new DistCpOptions.Builder(sourceList, targetPath)
+        .maxMaps(nMaps)
+        .build();
+  }
+
+  private static int createFile(String path, int fileSize) throws Exception {
+    FileSystem fileSystem = null;
+    DataOutputStream outputStream = null;
+    try {
+      fileSystem = cluster.getFileSystem();
+      outputStream = fileSystem.create(new Path(path), true, 0);
+      int size = (int) Math.ceil(fileSize + (1 - random.nextFloat()) * 
fileSize);
+      outputStream.write(new byte[size]);
+      return size;
+    }
+    finally {
+      IOUtils.cleanupWithLogger(null, fileSystem, outputStream);
+    }
+  }
+
+  @AfterClass
+  public static void tearDown() {

Review Comment:
   1. rename tearDownClass(); move up to just below the setup method
   2. wrap the shutdown with `if (cluster != null) {...}`



##########
hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/mapred/TestUniformRecordInputFormat.java:
##########
@@ -0,0 +1,191 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.tools.mapred;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.JobID;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.lib.input.FileSplit;
+import org.apache.hadoop.mapreduce.task.JobContextImpl;
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.tools.CopyListing;
+import org.apache.hadoop.tools.CopyListingFileStatus;
+import org.apache.hadoop.tools.DistCpContext;
+import org.apache.hadoop.tools.DistCpOptions;
+import org.apache.hadoop.tools.StubContext;
+import org.apache.hadoop.tools.util.DistCpUtils;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
+
+
+public class TestUniformRecordInputFormat {
+  private static MiniDFSCluster cluster;
+  private static final int N_FILES = 20;
+  private static final int SIZEOF_EACH_FILE = 1024;
+  private static final Random random = new Random();
+  private static int totalFileSize = 0;
+
+  private static final Credentials CREDENTIALS = new Credentials();
+
+
+  @BeforeClass
+  public static void setup() throws Exception {
+    cluster = new MiniDFSCluster.Builder(new Configuration()).numDataNodes(1)
+                                          .format(true).build();
+    totalFileSize = 0;
+
+    for (int i=0; i<N_FILES; ++i)
+      totalFileSize += createFile("/tmp/source/" + String.valueOf(i), 
SIZEOF_EACH_FILE);
+  }
+
+  private static DistCpOptions getOptions(int nMaps) throws Exception {
+    Path sourcePath = new Path(cluster.getFileSystem().getUri().toString()
+                               + "/tmp/source");
+    Path targetPath = new Path(cluster.getFileSystem().getUri().toString()
+                               + "/tmp/target");
+
+    List<Path> sourceList = new ArrayList<Path>();
+    sourceList.add(sourcePath);
+    return new DistCpOptions.Builder(sourceList, targetPath)
+        .maxMaps(nMaps)
+        .build();
+  }
+
+  private static int createFile(String path, int fileSize) throws Exception {
+    FileSystem fileSystem = null;
+    DataOutputStream outputStream = null;
+    try {
+      fileSystem = cluster.getFileSystem();
+      outputStream = fileSystem.create(new Path(path), true, 0);
+      int size = (int) Math.ceil(fileSize + (1 - random.nextFloat()) * 
fileSize);
+      outputStream.write(new byte[size]);
+      return size;
+    }
+    finally {
+      IOUtils.cleanupWithLogger(null, fileSystem, outputStream);
+    }
+  }
+
+  @AfterClass
+  public static void tearDown() {
+    cluster.shutdown();
+  }
+
+  public void testGetSplits(int nMaps) throws Exception {
+    DistCpContext context = new DistCpContext(getOptions(nMaps));
+    Configuration configuration = new Configuration();
+    configuration.set("mapred.map.tasks", 
String.valueOf(context.getMaxMaps()));
+    Path listFile = new Path(cluster.getFileSystem().getUri().toString()
+        + "/tmp/testGetSplits_2/fileList.seq");
+    CopyListing.getCopyListing(configuration, CREDENTIALS, context)
+        .buildListing(listFile, context);
+
+    JobContext jobContext = new JobContextImpl(configuration, new JobID());
+    UniformRecordInputFormat uniformRecordInputFormat = new 
UniformRecordInputFormat();
+    List<InputSplit> splits
+            = uniformRecordInputFormat.getSplits(jobContext);
+
+    long totalRecords = DistCpUtils.getLong(configuration, 
"mapred.number.of.records");
+    long recordPerMap = totalRecords / nMaps;
+
+
+    checkSplits(listFile, splits);
+
+    int doubleCheckedTotalSize = 0;
+    for (int i=0; i < splits.size(); ++i) {
+      InputSplit split = splits.get(i);
+      int currentSplitSize = 0;
+      RecordReader<Text, CopyListingFileStatus> recordReader =
+        uniformRecordInputFormat.createRecordReader(split, null);
+      StubContext stubContext = new StubContext(jobContext.getConfiguration(),
+                                                recordReader, 0);
+      final TaskAttemptContext taskAttemptContext
+         = stubContext.getContext();
+      recordReader.initialize(split, taskAttemptContext);
+      int recordCnt = 0;
+      while (recordReader.nextKeyValue()) {
+        recordCnt++;
+        Path sourcePath = recordReader.getCurrentValue().getPath();
+        FileSystem fs = sourcePath.getFileSystem(configuration);
+        FileStatus fileStatus [] = fs.listStatus(sourcePath);
+        if (fileStatus.length > 1) {
+          continue;

Review Comment:
   add a comment to explain



##########
hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/mapred/TestUniformRecordInputFormat.java:
##########
@@ -0,0 +1,191 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.tools.mapred;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.JobID;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.lib.input.FileSplit;
+import org.apache.hadoop.mapreduce.task.JobContextImpl;
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.tools.CopyListing;
+import org.apache.hadoop.tools.CopyListingFileStatus;
+import org.apache.hadoop.tools.DistCpContext;
+import org.apache.hadoop.tools.DistCpOptions;
+import org.apache.hadoop.tools.StubContext;
+import org.apache.hadoop.tools.util.DistCpUtils;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
+
+
+public class TestUniformRecordInputFormat {
+  private static MiniDFSCluster cluster;
+  private static final int N_FILES = 20;
+  private static final int SIZEOF_EACH_FILE = 1024;
+  private static final Random random = new Random();
+  private static int totalFileSize = 0;
+
+  private static final Credentials CREDENTIALS = new Credentials();
+
+
+  @BeforeClass
+  public static void setup() throws Exception {
+    cluster = new MiniDFSCluster.Builder(new Configuration()).numDataNodes(1)
+                                          .format(true).build();
+    totalFileSize = 0;
+
+    for (int i=0; i<N_FILES; ++i)
+      totalFileSize += createFile("/tmp/source/" + String.valueOf(i), 
SIZEOF_EACH_FILE);
+  }
+
+  private static DistCpOptions getOptions(int nMaps) throws Exception {
+    Path sourcePath = new Path(cluster.getFileSystem().getUri().toString()
+                               + "/tmp/source");
+    Path targetPath = new Path(cluster.getFileSystem().getUri().toString()
+                               + "/tmp/target");
+
+    List<Path> sourceList = new ArrayList<Path>();
+    sourceList.add(sourcePath);
+    return new DistCpOptions.Builder(sourceList, targetPath)
+        .maxMaps(nMaps)
+        .build();
+  }
+
+  private static int createFile(String path, int fileSize) throws Exception {
+    FileSystem fileSystem = null;
+    DataOutputStream outputStream = null;
+    try {
+      fileSystem = cluster.getFileSystem();
+      outputStream = fileSystem.create(new Path(path), true, 0);
+      int size = (int) Math.ceil(fileSize + (1 - random.nextFloat()) * 
fileSize);
+      outputStream.write(new byte[size]);
+      return size;
+    }
+    finally {
+      IOUtils.cleanupWithLogger(null, fileSystem, outputStream);
+    }
+  }
+
+  @AfterClass
+  public static void tearDown() {
+    cluster.shutdown();
+  }
+
+  public void testGetSplits(int nMaps) throws Exception {
+    DistCpContext context = new DistCpContext(getOptions(nMaps));
+    Configuration configuration = new Configuration();
+    configuration.set("mapred.map.tasks", 
String.valueOf(context.getMaxMaps()));
+    Path listFile = new Path(cluster.getFileSystem().getUri().toString()
+        + "/tmp/testGetSplits_2/fileList.seq");
+    CopyListing.getCopyListing(configuration, CREDENTIALS, context)
+        .buildListing(listFile, context);
+
+    JobContext jobContext = new JobContextImpl(configuration, new JobID());
+    UniformRecordInputFormat uniformRecordInputFormat = new 
UniformRecordInputFormat();
+    List<InputSplit> splits
+            = uniformRecordInputFormat.getSplits(jobContext);
+
+    long totalRecords = DistCpUtils.getLong(configuration, 
"mapred.number.of.records");
+    long recordPerMap = totalRecords / nMaps;
+
+
+    checkSplits(listFile, splits);
+
+    int doubleCheckedTotalSize = 0;
+    for (int i=0; i < splits.size(); ++i) {
+      InputSplit split = splits.get(i);
+      int currentSplitSize = 0;
+      RecordReader<Text, CopyListingFileStatus> recordReader =
+        uniformRecordInputFormat.createRecordReader(split, null);
+      StubContext stubContext = new StubContext(jobContext.getConfiguration(),
+                                                recordReader, 0);
+      final TaskAttemptContext taskAttemptContext
+         = stubContext.getContext();
+      recordReader.initialize(split, taskAttemptContext);
+      int recordCnt = 0;
+      while (recordReader.nextKeyValue()) {
+        recordCnt++;
+        Path sourcePath = recordReader.getCurrentValue().getPath();
+        FileSystem fs = sourcePath.getFileSystem(configuration);
+        FileStatus fileStatus [] = fs.listStatus(sourcePath);
+        if (fileStatus.length > 1) {
+          continue;
+        }
+        currentSplitSize += fileStatus[0].getLen();
+      }
+
+      Assert.assertTrue(recordCnt == recordPerMap || recordCnt == 
(recordPerMap + 1));
+      doubleCheckedTotalSize += currentSplitSize;
+    }
+
+    Assert.assertEquals(totalFileSize, doubleCheckedTotalSize);
+  }
+
+  private void checkSplits(Path listFile, List<InputSplit> splits) throws 
IOException {
+    long lastEnd = 0;
+
+    //Verify if each split's start is matching with the previous end and
+    //we are not missing anything
+    for (InputSplit split : splits) {
+      FileSplit fileSplit = (FileSplit) split;
+      long start = fileSplit.getStart();
+      Assert.assertEquals(lastEnd, start);
+      lastEnd = start + fileSplit.getLength();
+    }
+
+    //Verify there is nothing more to read from the input file
+    SequenceFile.Reader reader

Review Comment:
   or use a try-with-resources for the automatic close



##########
hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/UniformRecordInputFormat.java:
##########
@@ -0,0 +1,139 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.tools.mapred;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.InputFormat;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.lib.input.FileSplit;
+import org.apache.hadoop.mapreduce.lib.input.SequenceFileRecordReader;
+import org.apache.hadoop.tools.CopyListingFileStatus;
+import org.apache.hadoop.tools.util.DistCpUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+public class UniformRecordInputFormat extends InputFormat<Text, 
CopyListingFileStatus> {
+    private static final Logger LOG = 
LoggerFactory.getLogger(UniformRecordInputFormat.class);
+
+    public List<InputSplit> getSplits(JobContext context) throws IOException, 
InterruptedException {
+        Configuration conf = context.getConfiguration();
+        int numSplits = getNumSplits(conf);
+        if (numSplits == 0) return new ArrayList();
+
+        return createSplits(conf, numSplits, getNumberOfRecords(conf));
+    }
+
+    private List<InputSplit> createSplits(Configuration configuration, int 
numSplits, long numRecords) throws IOException {
+        List<InputSplit> splits = new ArrayList(numSplits);
+        long nRecordsPerSplit = (long) Math.floor(numRecords * 1.0 / 
numSplits);
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("Average records per map: " + nRecordsPerSplit +
+                    ", Number of maps: " + numSplits + ", total records: " + 
numRecords);
+        }
+
+        Path listingFilePath = getListingFilePath(configuration);
+        CopyListingFileStatus srcFileStatus = new CopyListingFileStatus();
+        Text srcRelPath = new Text();
+        long lastPosition = 0L;
+        long count = 0L;
+        long remains = numRecords - nRecordsPerSplit * (long) numSplits;
+
+        SequenceFile.Reader reader = null;
+        try {
+            reader = getListingFileReader(configuration);
+            while (reader.next(srcRelPath, srcFileStatus)) {
+                count++;
+
+                if((remains > 0 && count % (nRecordsPerSplit + 1) == 0) ||
+                        (remains == 0 && count % nRecordsPerSplit == 0)){
+
+                    long currentPosition = reader.getPosition();
+                    FileSplit split = new FileSplit(listingFilePath, 
lastPosition,
+                            currentPosition - lastPosition, null);
+                    if(LOG.isDebugEnabled()){
+                        LOG.debug("Creating split: " + split + ", records in 
split: " + count);
+                    }
+
+                    splits.add(split);
+                    lastPosition = currentPosition;
+                    if(remains > 0){
+                        remains--;
+                    }
+                    count = 0L;
+                }
+            }
+
+            return splits;
+        } finally {
+            IOUtils.closeStream(reader);
+        }
+    }
+
+    public RecordReader<Text, CopyListingFileStatus> createRecordReader(
+            InputSplit split, TaskAttemptContext context)
+            throws IOException, InterruptedException {
+        return new SequenceFileRecordReader<Text, CopyListingFileStatus>();
+    }
+
+    private static Path getListingFilePath(Configuration configuration) {
+        String listingFilePathString =
+                configuration.get("distcp.listing.file.path", "");
+
+        assert !listingFilePathString.equals("")
+                : "Couldn't find listing file. Invalid input.";
+        return new Path(listingFilePathString);
+    }
+
+    private SequenceFile.Reader getListingFileReader(Configuration conf) {
+        Path listingFilePath = getListingFilePath(conf);
+
+        try {
+            FileSystem fs = listingFilePath.getFileSystem(conf);
+            if (!fs.exists(listingFilePath)) {

Review Comment:
   use `org.apache.hadoop.util.Preconditions`
   
   ```java
   checkState(fs.exists(...), "listing file doesn't exist at %s", 
listingFilePath);
   ```
   
     



##########
hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/mapred/TestUniformRecordInputFormat.java:
##########
@@ -0,0 +1,191 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.tools.mapred;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.JobID;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.lib.input.FileSplit;
+import org.apache.hadoop.mapreduce.task.JobContextImpl;
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.tools.CopyListing;
+import org.apache.hadoop.tools.CopyListingFileStatus;
+import org.apache.hadoop.tools.DistCpContext;
+import org.apache.hadoop.tools.DistCpOptions;
+import org.apache.hadoop.tools.StubContext;
+import org.apache.hadoop.tools.util.DistCpUtils;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
+
+
+public class TestUniformRecordInputFormat {

Review Comment:
   extends AbstractHadoopTest



##########
hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/mapred/TestUniformRecordInputFormat.java:
##########
@@ -0,0 +1,191 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.tools.mapred;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.JobID;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.lib.input.FileSplit;
+import org.apache.hadoop.mapreduce.task.JobContextImpl;
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.tools.CopyListing;
+import org.apache.hadoop.tools.CopyListingFileStatus;
+import org.apache.hadoop.tools.DistCpContext;
+import org.apache.hadoop.tools.DistCpOptions;
+import org.apache.hadoop.tools.StubContext;
+import org.apache.hadoop.tools.util.DistCpUtils;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
+
+
+public class TestUniformRecordInputFormat {
+  private static MiniDFSCluster cluster;
+  private static final int N_FILES = 20;
+  private static final int SIZEOF_EACH_FILE = 1024;
+  private static final Random random = new Random();
+  private static int totalFileSize = 0;
+
+  private static final Credentials CREDENTIALS = new Credentials();
+
+
+  @BeforeClass
+  public static void setup() throws Exception {

Review Comment:
   can you give this a name like "setupClass" as setup() is generally used for 
per-test-case setup
   



##########
hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DirCopyFilter.java:
##########
@@ -0,0 +1,54 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.tools;
+
+import java.io.IOException;
+import org.apache.hadoop.conf.Configuration;

Review Comment:
   afraid you need to use the same import ordering as everything else -its a 
real source of merge pain and we try to stay in control
   
   java*
   
   non-apache.*
   
   org.apache.*
   
   static *
   



##########
hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/mapred/TestUniformRecordInputFormat.java:
##########
@@ -0,0 +1,191 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.tools.mapred;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.JobID;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.lib.input.FileSplit;
+import org.apache.hadoop.mapreduce.task.JobContextImpl;
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.tools.CopyListing;
+import org.apache.hadoop.tools.CopyListingFileStatus;
+import org.apache.hadoop.tools.DistCpContext;
+import org.apache.hadoop.tools.DistCpOptions;
+import org.apache.hadoop.tools.StubContext;
+import org.apache.hadoop.tools.util.DistCpUtils;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
+
+
+public class TestUniformRecordInputFormat {
+  private static MiniDFSCluster cluster;
+  private static final int N_FILES = 20;
+  private static final int SIZEOF_EACH_FILE = 1024;
+  private static final Random random = new Random();
+  private static int totalFileSize = 0;
+
+  private static final Credentials CREDENTIALS = new Credentials();
+
+
+  @BeforeClass
+  public static void setup() throws Exception {
+    cluster = new MiniDFSCluster.Builder(new Configuration()).numDataNodes(1)
+                                          .format(true).build();
+    totalFileSize = 0;
+
+    for (int i=0; i<N_FILES; ++i)
+      totalFileSize += createFile("/tmp/source/" + String.valueOf(i), 
SIZEOF_EACH_FILE);
+  }
+
+  private static DistCpOptions getOptions(int nMaps) throws Exception {
+    Path sourcePath = new Path(cluster.getFileSystem().getUri().toString()
+                               + "/tmp/source");
+    Path targetPath = new Path(cluster.getFileSystem().getUri().toString()
+                               + "/tmp/target");
+
+    List<Path> sourceList = new ArrayList<Path>();
+    sourceList.add(sourcePath);
+    return new DistCpOptions.Builder(sourceList, targetPath)
+        .maxMaps(nMaps)
+        .build();
+  }
+
+  private static int createFile(String path, int fileSize) throws Exception {
+    FileSystem fileSystem = null;
+    DataOutputStream outputStream = null;
+    try {
+      fileSystem = cluster.getFileSystem();
+      outputStream = fileSystem.create(new Path(path), true, 0);
+      int size = (int) Math.ceil(fileSize + (1 - random.nextFloat()) * 
fileSize);
+      outputStream.write(new byte[size]);
+      return size;
+    }
+    finally {
+      IOUtils.cleanupWithLogger(null, fileSystem, outputStream);

Review Comment:
   don't close the filesystem; it is needed for the other tests



##########
hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/UniformRecordInputFormat.java:
##########
@@ -0,0 +1,139 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.tools.mapred;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.InputFormat;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.lib.input.FileSplit;
+import org.apache.hadoop.mapreduce.lib.input.SequenceFileRecordReader;
+import org.apache.hadoop.tools.CopyListingFileStatus;
+import org.apache.hadoop.tools.util.DistCpUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+public class UniformRecordInputFormat extends InputFormat<Text, 
CopyListingFileStatus> {
+    private static final Logger LOG = 
LoggerFactory.getLogger(UniformRecordInputFormat.class);
+
+    public List<InputSplit> getSplits(JobContext context) throws IOException, 
InterruptedException {
+        Configuration conf = context.getConfiguration();
+        int numSplits = getNumSplits(conf);
+        if (numSplits == 0) return new ArrayList();
+
+        return createSplits(conf, numSplits, getNumberOfRecords(conf));
+    }
+
+    private List<InputSplit> createSplits(Configuration configuration, int 
numSplits, long numRecords) throws IOException {
+        List<InputSplit> splits = new ArrayList(numSplits);
+        long nRecordsPerSplit = (long) Math.floor(numRecords * 1.0 / 
numSplits);
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("Average records per map: " + nRecordsPerSplit +
+                    ", Number of maps: " + numSplits + ", total records: " + 
numRecords);
+        }
+
+        Path listingFilePath = getListingFilePath(configuration);
+        CopyListingFileStatus srcFileStatus = new CopyListingFileStatus();
+        Text srcRelPath = new Text();
+        long lastPosition = 0L;
+        long count = 0L;
+        long remains = numRecords - nRecordsPerSplit * (long) numSplits;
+
+        SequenceFile.Reader reader = null;
+        try {
+            reader = getListingFileReader(configuration);
+            while (reader.next(srcRelPath, srcFileStatus)) {
+                count++;
+
+                if((remains > 0 && count % (nRecordsPerSplit + 1) == 0) ||
+                        (remains == 0 && count % nRecordsPerSplit == 0)){
+
+                    long currentPosition = reader.getPosition();
+                    FileSplit split = new FileSplit(listingFilePath, 
lastPosition,
+                            currentPosition - lastPosition, null);
+                    if(LOG.isDebugEnabled()){
+                        LOG.debug("Creating split: " + split + ", records in 
split: " + count);
+                    }
+
+                    splits.add(split);
+                    lastPosition = currentPosition;
+                    if(remains > 0){
+                        remains--;
+                    }
+                    count = 0L;
+                }
+            }
+
+            return splits;
+        } finally {
+            IOUtils.closeStream(reader);
+        }
+    }
+
+    public RecordReader<Text, CopyListingFileStatus> createRecordReader(
+            InputSplit split, TaskAttemptContext context)
+            throws IOException, InterruptedException {
+        return new SequenceFileRecordReader<Text, CopyListingFileStatus>();
+    }
+
+    private static Path getListingFilePath(Configuration configuration) {
+        String listingFilePathString =
+                configuration.get("distcp.listing.file.path", "");

Review Comment:
   must always reference constant strings for easse of maintenance



##########
hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DirCopyFilter.java:
##########
@@ -0,0 +1,54 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.tools;
+
+import java.io.IOException;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class DirCopyFilter extends FileStatusCopyFilter {
+    private static final Logger LOG = 
LoggerFactory.getLogger(DirCopyFilter.class);
+    Configuration conf;
+
+    public DirCopyFilter(Configuration conf) {
+        this.conf = conf;
+    }
+
+    @Override
+    public boolean shouldCopy(Path path) {
+        try {
+            FileSystem fs = path.getFileSystem(this.conf);
+            if (fs.getFileStatus(path).isDirectory()) {
+                return true;
+            }
+        } catch (IOException e) {
+            throw new RuntimeException(e);

Review Comment:
   RuntimeIOException



##########
hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/UniformRecordInputFormat.java:
##########
@@ -0,0 +1,139 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.tools.mapred;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.InputFormat;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.lib.input.FileSplit;
+import org.apache.hadoop.mapreduce.lib.input.SequenceFileRecordReader;
+import org.apache.hadoop.tools.CopyListingFileStatus;
+import org.apache.hadoop.tools.util.DistCpUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+public class UniformRecordInputFormat extends InputFormat<Text, 
CopyListingFileStatus> {
+    private static final Logger LOG = 
LoggerFactory.getLogger(UniformRecordInputFormat.class);
+
+    public List<InputSplit> getSplits(JobContext context) throws IOException, 
InterruptedException {
+        Configuration conf = context.getConfiguration();
+        int numSplits = getNumSplits(conf);
+        if (numSplits == 0) return new ArrayList();
+
+        return createSplits(conf, numSplits, getNumberOfRecords(conf));
+    }
+
+    private List<InputSplit> createSplits(Configuration configuration, int 
numSplits, long numRecords) throws IOException {
+        List<InputSplit> splits = new ArrayList(numSplits);
+        long nRecordsPerSplit = (long) Math.floor(numRecords * 1.0 / 
numSplits);
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("Average records per map: " + nRecordsPerSplit +
+                    ", Number of maps: " + numSplits + ", total records: " + 
numRecords);
+        }
+
+        Path listingFilePath = getListingFilePath(configuration);
+        CopyListingFileStatus srcFileStatus = new CopyListingFileStatus();
+        Text srcRelPath = new Text();
+        long lastPosition = 0L;
+        long count = 0L;
+        long remains = numRecords - nRecordsPerSplit * (long) numSplits;
+
+        SequenceFile.Reader reader = null;
+        try {
+            reader = getListingFileReader(configuration);
+            while (reader.next(srcRelPath, srcFileStatus)) {
+                count++;
+
+                if((remains > 0 && count % (nRecordsPerSplit + 1) == 0) ||
+                        (remains == 0 && count % nRecordsPerSplit == 0)){
+
+                    long currentPosition = reader.getPosition();
+                    FileSplit split = new FileSplit(listingFilePath, 
lastPosition,
+                            currentPosition - lastPosition, null);
+                    if(LOG.isDebugEnabled()){
+                        LOG.debug("Creating split: " + split + ", records in 
split: " + count);
+                    }
+
+                    splits.add(split);
+                    lastPosition = currentPosition;
+                    if(remains > 0){
+                        remains--;
+                    }
+                    count = 0L;
+                }
+            }
+
+            return splits;
+        } finally {
+            IOUtils.closeStream(reader);
+        }
+    }
+
+    public RecordReader<Text, CopyListingFileStatus> createRecordReader(
+            InputSplit split, TaskAttemptContext context)
+            throws IOException, InterruptedException {
+        return new SequenceFileRecordReader<Text, CopyListingFileStatus>();
+    }
+
+    private static Path getListingFilePath(Configuration configuration) {
+        String listingFilePathString =
+                configuration.get("distcp.listing.file.path", "");
+
+        assert !listingFilePathString.equals("")
+                : "Couldn't find listing file. Invalid input.";
+        return new Path(listingFilePathString);
+    }
+
+    private SequenceFile.Reader getListingFileReader(Configuration conf) {
+        Path listingFilePath = getListingFilePath(conf);
+
+        try {
+            FileSystem fs = listingFilePath.getFileSystem(conf);
+            if (!fs.exists(listingFilePath)) {
+                throw new IllegalArgumentException("Listing file doesn't exist 
at: "
+                        + listingFilePath);
+            }
+            return new SequenceFile.Reader(conf,
+                    SequenceFile.Reader.file(listingFilePath));
+        } catch (IOException exception) {
+            LOG.error("Couldn't find listing file at: " + listingFilePath, 
exception);

Review Comment:
   use slf4j "couldn't read {}", listingFilePAth" format
   
   using "read" over "find" as there could be other reasons for the failure 
(permissions etc)



##########
hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/mapred/TestUniformRecordInputFormat.java:
##########
@@ -0,0 +1,191 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.tools.mapred;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.JobID;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.lib.input.FileSplit;
+import org.apache.hadoop.mapreduce.task.JobContextImpl;
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.tools.CopyListing;
+import org.apache.hadoop.tools.CopyListingFileStatus;
+import org.apache.hadoop.tools.DistCpContext;
+import org.apache.hadoop.tools.DistCpOptions;
+import org.apache.hadoop.tools.StubContext;
+import org.apache.hadoop.tools.util.DistCpUtils;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
+
+
+public class TestUniformRecordInputFormat {
+  private static MiniDFSCluster cluster;
+  private static final int N_FILES = 20;
+  private static final int SIZEOF_EACH_FILE = 1024;
+  private static final Random random = new Random();
+  private static int totalFileSize = 0;
+
+  private static final Credentials CREDENTIALS = new Credentials();
+
+
+  @BeforeClass
+  public static void setup() throws Exception {
+    cluster = new MiniDFSCluster.Builder(new Configuration()).numDataNodes(1)
+                                          .format(true).build();
+    totalFileSize = 0;
+
+    for (int i=0; i<N_FILES; ++i)
+      totalFileSize += createFile("/tmp/source/" + String.valueOf(i), 
SIZEOF_EACH_FILE);
+  }
+
+  private static DistCpOptions getOptions(int nMaps) throws Exception {
+    Path sourcePath = new Path(cluster.getFileSystem().getUri().toString()
+                               + "/tmp/source");
+    Path targetPath = new Path(cluster.getFileSystem().getUri().toString()
+                               + "/tmp/target");
+
+    List<Path> sourceList = new ArrayList<Path>();
+    sourceList.add(sourcePath);
+    return new DistCpOptions.Builder(sourceList, targetPath)
+        .maxMaps(nMaps)
+        .build();
+  }
+
+  private static int createFile(String path, int fileSize) throws Exception {
+    FileSystem fileSystem = null;
+    DataOutputStream outputStream = null;
+    try {
+      fileSystem = cluster.getFileSystem();
+      outputStream = fileSystem.create(new Path(path), true, 0);
+      int size = (int) Math.ceil(fileSize + (1 - random.nextFloat()) * 
fileSize);
+      outputStream.write(new byte[size]);
+      return size;
+    }
+    finally {
+      IOUtils.cleanupWithLogger(null, fileSystem, outputStream);
+    }
+  }
+
+  @AfterClass
+  public static void tearDown() {
+    cluster.shutdown();
+  }
+
+  public void testGetSplits(int nMaps) throws Exception {
+    DistCpContext context = new DistCpContext(getOptions(nMaps));
+    Configuration configuration = new Configuration();
+    configuration.set("mapred.map.tasks", 
String.valueOf(context.getMaxMaps()));
+    Path listFile = new Path(cluster.getFileSystem().getUri().toString()
+        + "/tmp/testGetSplits_2/fileList.seq");
+    CopyListing.getCopyListing(configuration, CREDENTIALS, context)
+        .buildListing(listFile, context);
+
+    JobContext jobContext = new JobContextImpl(configuration, new JobID());
+    UniformRecordInputFormat uniformRecordInputFormat = new 
UniformRecordInputFormat();
+    List<InputSplit> splits
+            = uniformRecordInputFormat.getSplits(jobContext);
+
+    long totalRecords = DistCpUtils.getLong(configuration, 
"mapred.number.of.records");
+    long recordPerMap = totalRecords / nMaps;
+
+
+    checkSplits(listFile, splits);
+
+    int doubleCheckedTotalSize = 0;
+    for (int i=0; i < splits.size(); ++i) {
+      InputSplit split = splits.get(i);
+      int currentSplitSize = 0;
+      RecordReader<Text, CopyListingFileStatus> recordReader =
+        uniformRecordInputFormat.createRecordReader(split, null);
+      StubContext stubContext = new StubContext(jobContext.getConfiguration(),
+                                                recordReader, 0);
+      final TaskAttemptContext taskAttemptContext
+         = stubContext.getContext();
+      recordReader.initialize(split, taskAttemptContext);
+      int recordCnt = 0;
+      while (recordReader.nextKeyValue()) {
+        recordCnt++;
+        Path sourcePath = recordReader.getCurrentValue().getPath();
+        FileSystem fs = sourcePath.getFileSystem(configuration);
+        FileStatus fileStatus [] = fs.listStatus(sourcePath);
+        if (fileStatus.length > 1) {
+          continue;
+        }
+        currentSplitSize += fileStatus[0].getLen();
+      }
+
+      Assert.assertTrue(recordCnt == recordPerMap || recordCnt == 
(recordPerMap + 1));

Review Comment:
   use AssertJ.assertThat(). eg.
   
   ```java
   assertJ.assertThat(recordCnt)
    .decribedAs("record count")
    .isGreaterThanOrEqualTo(recordPerMap)
    .isLessThanOrEqualTo(recordPerMap +1)
   ```
   
   assertj gives wonderful error messages on assertion failures.
   



##########
hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/mapred/TestUniformRecordInputFormat.java:
##########
@@ -0,0 +1,191 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.tools.mapred;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.JobID;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.lib.input.FileSplit;
+import org.apache.hadoop.mapreduce.task.JobContextImpl;
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.tools.CopyListing;
+import org.apache.hadoop.tools.CopyListingFileStatus;
+import org.apache.hadoop.tools.DistCpContext;
+import org.apache.hadoop.tools.DistCpOptions;
+import org.apache.hadoop.tools.StubContext;
+import org.apache.hadoop.tools.util.DistCpUtils;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
+
+
+public class TestUniformRecordInputFormat {
+  private static MiniDFSCluster cluster;
+  private static final int N_FILES = 20;
+  private static final int SIZEOF_EACH_FILE = 1024;
+  private static final Random random = new Random();
+  private static int totalFileSize = 0;
+
+  private static final Credentials CREDENTIALS = new Credentials();
+
+
+  @BeforeClass
+  public static void setup() throws Exception {
+    cluster = new MiniDFSCluster.Builder(new Configuration()).numDataNodes(1)
+                                          .format(true).build();
+    totalFileSize = 0;
+
+    for (int i=0; i<N_FILES; ++i)
+      totalFileSize += createFile("/tmp/source/" + String.valueOf(i), 
SIZEOF_EACH_FILE);
+  }
+
+  private static DistCpOptions getOptions(int nMaps) throws Exception {
+    Path sourcePath = new Path(cluster.getFileSystem().getUri().toString()
+                               + "/tmp/source");
+    Path targetPath = new Path(cluster.getFileSystem().getUri().toString()
+                               + "/tmp/target");
+
+    List<Path> sourceList = new ArrayList<Path>();
+    sourceList.add(sourcePath);
+    return new DistCpOptions.Builder(sourceList, targetPath)
+        .maxMaps(nMaps)
+        .build();
+  }
+
+  private static int createFile(String path, int fileSize) throws Exception {
+    FileSystem fileSystem = null;
+    DataOutputStream outputStream = null;
+    try {
+      fileSystem = cluster.getFileSystem();
+      outputStream = fileSystem.create(new Path(path), true, 0);
+      int size = (int) Math.ceil(fileSize + (1 - random.nextFloat()) * 
fileSize);
+      outputStream.write(new byte[size]);
+      return size;
+    }
+    finally {
+      IOUtils.cleanupWithLogger(null, fileSystem, outputStream);
+    }
+  }
+
+  @AfterClass
+  public static void tearDown() {
+    cluster.shutdown();
+  }
+
+  public void testGetSplits(int nMaps) throws Exception {
+    DistCpContext context = new DistCpContext(getOptions(nMaps));
+    Configuration configuration = new Configuration();
+    configuration.set("mapred.map.tasks", 
String.valueOf(context.getMaxMaps()));
+    Path listFile = new Path(cluster.getFileSystem().getUri().toString()
+        + "/tmp/testGetSplits_2/fileList.seq");
+    CopyListing.getCopyListing(configuration, CREDENTIALS, context)
+        .buildListing(listFile, context);
+
+    JobContext jobContext = new JobContextImpl(configuration, new JobID());
+    UniformRecordInputFormat uniformRecordInputFormat = new 
UniformRecordInputFormat();
+    List<InputSplit> splits
+            = uniformRecordInputFormat.getSplits(jobContext);
+
+    long totalRecords = DistCpUtils.getLong(configuration, 
"mapred.number.of.records");
+    long recordPerMap = totalRecords / nMaps;
+
+
+    checkSplits(listFile, splits);
+
+    int doubleCheckedTotalSize = 0;
+    for (int i=0; i < splits.size(); ++i) {
+      InputSplit split = splits.get(i);
+      int currentSplitSize = 0;
+      RecordReader<Text, CopyListingFileStatus> recordReader =
+        uniformRecordInputFormat.createRecordReader(split, null);
+      StubContext stubContext = new StubContext(jobContext.getConfiguration(),
+                                                recordReader, 0);
+      final TaskAttemptContext taskAttemptContext
+         = stubContext.getContext();
+      recordReader.initialize(split, taskAttemptContext);
+      int recordCnt = 0;
+      while (recordReader.nextKeyValue()) {
+        recordCnt++;
+        Path sourcePath = recordReader.getCurrentValue().getPath();
+        FileSystem fs = sourcePath.getFileSystem(configuration);
+        FileStatus fileStatus [] = fs.listStatus(sourcePath);
+        if (fileStatus.length > 1) {
+          continue;
+        }
+        currentSplitSize += fileStatus[0].getLen();
+      }
+
+      Assert.assertTrue(recordCnt == recordPerMap || recordCnt == 
(recordPerMap + 1));
+      doubleCheckedTotalSize += currentSplitSize;
+    }
+
+    Assert.assertEquals(totalFileSize, doubleCheckedTotalSize);

Review Comment:
   again, assertJ here and everywhere below



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to