[ 
https://issues.apache.org/jira/browse/HADOOP-18891?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17764615#comment-17764615
 ] 

ASF GitHub Bot commented on HADOOP-18891:
-----------------------------------------

steveloughran commented on code in PR #6058:
URL: https://github.com/apache/hadoop/pull/6058#discussion_r1324210411


##########
hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/CopyFilter.java:
##########
@@ -46,6 +46,14 @@ public void initialize() {}
    */
   public abstract boolean shouldCopy(Path path);
 
+  public boolean shouldCopy(CopyListingFileStatus fileStatus){
+    return shouldCopy(fileStatus.getPath());
+  }
+
+  public boolean supportFileStatus(){

Review Comment:
   what's this for? add javadoc to explain.



##########
hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DirCopyFilter.java:
##########
@@ -0,0 +1,54 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.tools;
+
+import java.io.IOException;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class DirCopyFilter extends FileStatusCopyFilter {

Review Comment:
   needs javadocs, and know that our indentation policy is "two spaces"



##########
hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/CopyFilter.java:
##########
@@ -46,6 +46,14 @@ public void initialize() {}
    */
   public abstract boolean shouldCopy(Path path);
 
+  public boolean shouldCopy(CopyListingFileStatus fileStatus){

Review Comment:
   needs javadoc



##########
hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/UniformRecordInputFormat.java:
##########
@@ -0,0 +1,139 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.tools.mapred;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.InputFormat;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.lib.input.FileSplit;
+import org.apache.hadoop.mapreduce.lib.input.SequenceFileRecordReader;
+import org.apache.hadoop.tools.CopyListingFileStatus;
+import org.apache.hadoop.tools.util.DistCpUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+public class UniformRecordInputFormat extends InputFormat<Text, 
CopyListingFileStatus> {
+    private static final Logger LOG = 
LoggerFactory.getLogger(UniformRecordInputFormat.class);
+
+    public List<InputSplit> getSplits(JobContext context) throws IOException, 
InterruptedException {
+        Configuration conf = context.getConfiguration();
+        int numSplits = getNumSplits(conf);
+        if (numSplits == 0) return new ArrayList();
+
+        return createSplits(conf, numSplits, getNumberOfRecords(conf));
+    }
+
+    private List<InputSplit> createSplits(Configuration configuration, int 
numSplits, long numRecords) throws IOException {
+        List<InputSplit> splits = new ArrayList(numSplits);
+        long nRecordsPerSplit = (long) Math.floor(numRecords * 1.0 / 
numSplits);
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("Average records per map: " + nRecordsPerSplit +
+                    ", Number of maps: " + numSplits + ", total records: " + 
numRecords);
+        }
+
+        Path listingFilePath = getListingFilePath(configuration);
+        CopyListingFileStatus srcFileStatus = new CopyListingFileStatus();
+        Text srcRelPath = new Text();
+        long lastPosition = 0L;
+        long count = 0L;
+        long remains = numRecords - nRecordsPerSplit * (long) numSplits;
+
+        SequenceFile.Reader reader = null;
+        try {
+            reader = getListingFileReader(configuration);
+            while (reader.next(srcRelPath, srcFileStatus)) {
+                count++;
+
+                if((remains > 0 && count % (nRecordsPerSplit + 1) == 0) ||
+                        (remains == 0 && count % nRecordsPerSplit == 0)){
+
+                    long currentPosition = reader.getPosition();
+                    FileSplit split = new FileSplit(listingFilePath, 
lastPosition,
+                            currentPosition - lastPosition, null);
+                    if(LOG.isDebugEnabled()){
+                        LOG.debug("Creating split: " + split + ", records in 
split: " + count);
+                    }
+
+                    splits.add(split);
+                    lastPosition = currentPosition;
+                    if(remains > 0){
+                        remains--;
+                    }
+                    count = 0L;
+                }
+            }
+
+            return splits;
+        } finally {
+            IOUtils.closeStream(reader);
+        }
+    }
+
+    public RecordReader<Text, CopyListingFileStatus> createRecordReader(
+            InputSplit split, TaskAttemptContext context)
+            throws IOException, InterruptedException {
+        return new SequenceFileRecordReader<Text, CopyListingFileStatus>();
+    }
+
+    private static Path getListingFilePath(Configuration configuration) {
+        String listingFilePathString =
+                configuration.get("distcp.listing.file.path", "");
+
+        assert !listingFilePathString.equals("")
+                : "Couldn't find listing file. Invalid input.";
+        return new Path(listingFilePathString);
+    }
+
+    private SequenceFile.Reader getListingFileReader(Configuration conf) {
+        Path listingFilePath = getListingFilePath(conf);
+
+        try {
+            FileSystem fs = listingFilePath.getFileSystem(conf);
+            if (!fs.exists(listingFilePath)) {
+                throw new IllegalArgumentException("Listing file doesn't exist 
at: "
+                        + listingFilePath);
+            }
+            return new SequenceFile.Reader(conf,
+                    SequenceFile.Reader.file(listingFilePath));
+        } catch (IOException exception) {
+            LOG.error("Couldn't find listing file at: " + listingFilePath, 
exception);
+            throw new IllegalArgumentException("Couldn't find listing-file at: 
"

Review Comment:
   make this an `IllegalStateException` for consistency with the checkState.
   
   we will need tests to simulate these failures



##########
hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/mapred/TestUniformRecordInputFormat.java:
##########
@@ -0,0 +1,191 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.tools.mapred;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.JobID;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.lib.input.FileSplit;
+import org.apache.hadoop.mapreduce.task.JobContextImpl;
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.tools.CopyListing;
+import org.apache.hadoop.tools.CopyListingFileStatus;
+import org.apache.hadoop.tools.DistCpContext;
+import org.apache.hadoop.tools.DistCpOptions;
+import org.apache.hadoop.tools.StubContext;
+import org.apache.hadoop.tools.util.DistCpUtils;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
+
+
+public class TestUniformRecordInputFormat {
+  private static MiniDFSCluster cluster;
+  private static final int N_FILES = 20;
+  private static final int SIZEOF_EACH_FILE = 1024;
+  private static final Random random = new Random();
+  private static int totalFileSize = 0;
+
+  private static final Credentials CREDENTIALS = new Credentials();
+
+
+  @BeforeClass
+  public static void setup() throws Exception {
+    cluster = new MiniDFSCluster.Builder(new Configuration()).numDataNodes(1)
+                                          .format(true).build();
+    totalFileSize = 0;
+
+    for (int i=0; i<N_FILES; ++i)
+      totalFileSize += createFile("/tmp/source/" + String.valueOf(i), 
SIZEOF_EACH_FILE);
+  }
+
+  private static DistCpOptions getOptions(int nMaps) throws Exception {
+    Path sourcePath = new Path(cluster.getFileSystem().getUri().toString()
+                               + "/tmp/source");
+    Path targetPath = new Path(cluster.getFileSystem().getUri().toString()
+                               + "/tmp/target");
+
+    List<Path> sourceList = new ArrayList<Path>();
+    sourceList.add(sourcePath);
+    return new DistCpOptions.Builder(sourceList, targetPath)
+        .maxMaps(nMaps)
+        .build();
+  }
+
+  private static int createFile(String path, int fileSize) throws Exception {
+    FileSystem fileSystem = null;
+    DataOutputStream outputStream = null;
+    try {
+      fileSystem = cluster.getFileSystem();
+      outputStream = fileSystem.create(new Path(path), true, 0);
+      int size = (int) Math.ceil(fileSize + (1 - random.nextFloat()) * 
fileSize);
+      outputStream.write(new byte[size]);
+      return size;
+    }
+    finally {
+      IOUtils.cleanupWithLogger(null, fileSystem, outputStream);
+    }
+  }
+
+  @AfterClass
+  public static void tearDown() {

Review Comment:
   1. rename tearDownClass(); move up to just below the setup method
   2. wrap the shutdown with `if (cluster != null) {...}`



##########
hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/mapred/TestUniformRecordInputFormat.java:
##########
@@ -0,0 +1,191 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.tools.mapred;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.JobID;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.lib.input.FileSplit;
+import org.apache.hadoop.mapreduce.task.JobContextImpl;
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.tools.CopyListing;
+import org.apache.hadoop.tools.CopyListingFileStatus;
+import org.apache.hadoop.tools.DistCpContext;
+import org.apache.hadoop.tools.DistCpOptions;
+import org.apache.hadoop.tools.StubContext;
+import org.apache.hadoop.tools.util.DistCpUtils;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
+
+
+public class TestUniformRecordInputFormat {
+  private static MiniDFSCluster cluster;
+  private static final int N_FILES = 20;
+  private static final int SIZEOF_EACH_FILE = 1024;
+  private static final Random random = new Random();
+  private static int totalFileSize = 0;
+
+  private static final Credentials CREDENTIALS = new Credentials();
+
+
+  @BeforeClass
+  public static void setup() throws Exception {
+    cluster = new MiniDFSCluster.Builder(new Configuration()).numDataNodes(1)
+                                          .format(true).build();
+    totalFileSize = 0;
+
+    for (int i=0; i<N_FILES; ++i)
+      totalFileSize += createFile("/tmp/source/" + String.valueOf(i), 
SIZEOF_EACH_FILE);
+  }
+
+  private static DistCpOptions getOptions(int nMaps) throws Exception {
+    Path sourcePath = new Path(cluster.getFileSystem().getUri().toString()
+                               + "/tmp/source");
+    Path targetPath = new Path(cluster.getFileSystem().getUri().toString()
+                               + "/tmp/target");
+
+    List<Path> sourceList = new ArrayList<Path>();
+    sourceList.add(sourcePath);
+    return new DistCpOptions.Builder(sourceList, targetPath)
+        .maxMaps(nMaps)
+        .build();
+  }
+
+  private static int createFile(String path, int fileSize) throws Exception {
+    FileSystem fileSystem = null;
+    DataOutputStream outputStream = null;
+    try {
+      fileSystem = cluster.getFileSystem();
+      outputStream = fileSystem.create(new Path(path), true, 0);
+      int size = (int) Math.ceil(fileSize + (1 - random.nextFloat()) * 
fileSize);
+      outputStream.write(new byte[size]);
+      return size;
+    }
+    finally {
+      IOUtils.cleanupWithLogger(null, fileSystem, outputStream);
+    }
+  }
+
+  @AfterClass
+  public static void tearDown() {
+    cluster.shutdown();
+  }
+
+  public void testGetSplits(int nMaps) throws Exception {
+    DistCpContext context = new DistCpContext(getOptions(nMaps));
+    Configuration configuration = new Configuration();
+    configuration.set("mapred.map.tasks", 
String.valueOf(context.getMaxMaps()));
+    Path listFile = new Path(cluster.getFileSystem().getUri().toString()
+        + "/tmp/testGetSplits_2/fileList.seq");
+    CopyListing.getCopyListing(configuration, CREDENTIALS, context)
+        .buildListing(listFile, context);
+
+    JobContext jobContext = new JobContextImpl(configuration, new JobID());
+    UniformRecordInputFormat uniformRecordInputFormat = new 
UniformRecordInputFormat();
+    List<InputSplit> splits
+            = uniformRecordInputFormat.getSplits(jobContext);
+
+    long totalRecords = DistCpUtils.getLong(configuration, 
"mapred.number.of.records");
+    long recordPerMap = totalRecords / nMaps;
+
+
+    checkSplits(listFile, splits);
+
+    int doubleCheckedTotalSize = 0;
+    for (int i=0; i < splits.size(); ++i) {
+      InputSplit split = splits.get(i);
+      int currentSplitSize = 0;
+      RecordReader<Text, CopyListingFileStatus> recordReader =
+        uniformRecordInputFormat.createRecordReader(split, null);
+      StubContext stubContext = new StubContext(jobContext.getConfiguration(),
+                                                recordReader, 0);
+      final TaskAttemptContext taskAttemptContext
+         = stubContext.getContext();
+      recordReader.initialize(split, taskAttemptContext);
+      int recordCnt = 0;
+      while (recordReader.nextKeyValue()) {
+        recordCnt++;
+        Path sourcePath = recordReader.getCurrentValue().getPath();
+        FileSystem fs = sourcePath.getFileSystem(configuration);
+        FileStatus fileStatus [] = fs.listStatus(sourcePath);
+        if (fileStatus.length > 1) {
+          continue;

Review Comment:
   add a comment to explain



##########
hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/mapred/TestUniformRecordInputFormat.java:
##########
@@ -0,0 +1,191 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.tools.mapred;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.JobID;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.lib.input.FileSplit;
+import org.apache.hadoop.mapreduce.task.JobContextImpl;
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.tools.CopyListing;
+import org.apache.hadoop.tools.CopyListingFileStatus;
+import org.apache.hadoop.tools.DistCpContext;
+import org.apache.hadoop.tools.DistCpOptions;
+import org.apache.hadoop.tools.StubContext;
+import org.apache.hadoop.tools.util.DistCpUtils;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
+
+
+public class TestUniformRecordInputFormat {
+  private static MiniDFSCluster cluster;
+  private static final int N_FILES = 20;
+  private static final int SIZEOF_EACH_FILE = 1024;
+  private static final Random random = new Random();
+  private static int totalFileSize = 0;
+
+  private static final Credentials CREDENTIALS = new Credentials();
+
+
+  @BeforeClass
+  public static void setup() throws Exception {
+    cluster = new MiniDFSCluster.Builder(new Configuration()).numDataNodes(1)
+                                          .format(true).build();
+    totalFileSize = 0;
+
+    for (int i=0; i<N_FILES; ++i)
+      totalFileSize += createFile("/tmp/source/" + String.valueOf(i), 
SIZEOF_EACH_FILE);
+  }
+
+  private static DistCpOptions getOptions(int nMaps) throws Exception {
+    Path sourcePath = new Path(cluster.getFileSystem().getUri().toString()
+                               + "/tmp/source");
+    Path targetPath = new Path(cluster.getFileSystem().getUri().toString()
+                               + "/tmp/target");
+
+    List<Path> sourceList = new ArrayList<Path>();
+    sourceList.add(sourcePath);
+    return new DistCpOptions.Builder(sourceList, targetPath)
+        .maxMaps(nMaps)
+        .build();
+  }
+
+  private static int createFile(String path, int fileSize) throws Exception {
+    FileSystem fileSystem = null;
+    DataOutputStream outputStream = null;
+    try {
+      fileSystem = cluster.getFileSystem();
+      outputStream = fileSystem.create(new Path(path), true, 0);
+      int size = (int) Math.ceil(fileSize + (1 - random.nextFloat()) * 
fileSize);
+      outputStream.write(new byte[size]);
+      return size;
+    }
+    finally {
+      IOUtils.cleanupWithLogger(null, fileSystem, outputStream);
+    }
+  }
+
+  @AfterClass
+  public static void tearDown() {
+    cluster.shutdown();
+  }
+
+  public void testGetSplits(int nMaps) throws Exception {
+    DistCpContext context = new DistCpContext(getOptions(nMaps));
+    Configuration configuration = new Configuration();
+    configuration.set("mapred.map.tasks", 
String.valueOf(context.getMaxMaps()));
+    Path listFile = new Path(cluster.getFileSystem().getUri().toString()
+        + "/tmp/testGetSplits_2/fileList.seq");
+    CopyListing.getCopyListing(configuration, CREDENTIALS, context)
+        .buildListing(listFile, context);
+
+    JobContext jobContext = new JobContextImpl(configuration, new JobID());
+    UniformRecordInputFormat uniformRecordInputFormat = new 
UniformRecordInputFormat();
+    List<InputSplit> splits
+            = uniformRecordInputFormat.getSplits(jobContext);
+
+    long totalRecords = DistCpUtils.getLong(configuration, 
"mapred.number.of.records");
+    long recordPerMap = totalRecords / nMaps;
+
+
+    checkSplits(listFile, splits);
+
+    int doubleCheckedTotalSize = 0;
+    for (int i=0; i < splits.size(); ++i) {
+      InputSplit split = splits.get(i);
+      int currentSplitSize = 0;
+      RecordReader<Text, CopyListingFileStatus> recordReader =
+        uniformRecordInputFormat.createRecordReader(split, null);
+      StubContext stubContext = new StubContext(jobContext.getConfiguration(),
+                                                recordReader, 0);
+      final TaskAttemptContext taskAttemptContext
+         = stubContext.getContext();
+      recordReader.initialize(split, taskAttemptContext);
+      int recordCnt = 0;
+      while (recordReader.nextKeyValue()) {
+        recordCnt++;
+        Path sourcePath = recordReader.getCurrentValue().getPath();
+        FileSystem fs = sourcePath.getFileSystem(configuration);
+        FileStatus fileStatus [] = fs.listStatus(sourcePath);
+        if (fileStatus.length > 1) {
+          continue;
+        }
+        currentSplitSize += fileStatus[0].getLen();
+      }
+
+      Assert.assertTrue(recordCnt == recordPerMap || recordCnt == 
(recordPerMap + 1));
+      doubleCheckedTotalSize += currentSplitSize;
+    }
+
+    Assert.assertEquals(totalFileSize, doubleCheckedTotalSize);
+  }
+
+  private void checkSplits(Path listFile, List<InputSplit> splits) throws 
IOException {
+    long lastEnd = 0;
+
+    //Verify if each split's start is matching with the previous end and
+    //we are not missing anything
+    for (InputSplit split : splits) {
+      FileSplit fileSplit = (FileSplit) split;
+      long start = fileSplit.getStart();
+      Assert.assertEquals(lastEnd, start);
+      lastEnd = start + fileSplit.getLength();
+    }
+
+    //Verify there is nothing more to read from the input file
+    SequenceFile.Reader reader

Review Comment:
   or use a try-with-resources for the automatic close



##########
hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/UniformRecordInputFormat.java:
##########
@@ -0,0 +1,139 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.tools.mapred;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.InputFormat;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.lib.input.FileSplit;
+import org.apache.hadoop.mapreduce.lib.input.SequenceFileRecordReader;
+import org.apache.hadoop.tools.CopyListingFileStatus;
+import org.apache.hadoop.tools.util.DistCpUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+public class UniformRecordInputFormat extends InputFormat<Text, 
CopyListingFileStatus> {
+    private static final Logger LOG = 
LoggerFactory.getLogger(UniformRecordInputFormat.class);
+
+    public List<InputSplit> getSplits(JobContext context) throws IOException, 
InterruptedException {
+        Configuration conf = context.getConfiguration();
+        int numSplits = getNumSplits(conf);
+        if (numSplits == 0) return new ArrayList();
+
+        return createSplits(conf, numSplits, getNumberOfRecords(conf));
+    }
+
+    private List<InputSplit> createSplits(Configuration configuration, int 
numSplits, long numRecords) throws IOException {
+        List<InputSplit> splits = new ArrayList(numSplits);
+        long nRecordsPerSplit = (long) Math.floor(numRecords * 1.0 / 
numSplits);
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("Average records per map: " + nRecordsPerSplit +
+                    ", Number of maps: " + numSplits + ", total records: " + 
numRecords);
+        }
+
+        Path listingFilePath = getListingFilePath(configuration);
+        CopyListingFileStatus srcFileStatus = new CopyListingFileStatus();
+        Text srcRelPath = new Text();
+        long lastPosition = 0L;
+        long count = 0L;
+        long remains = numRecords - nRecordsPerSplit * (long) numSplits;
+
+        SequenceFile.Reader reader = null;
+        try {
+            reader = getListingFileReader(configuration);
+            while (reader.next(srcRelPath, srcFileStatus)) {
+                count++;
+
+                if((remains > 0 && count % (nRecordsPerSplit + 1) == 0) ||
+                        (remains == 0 && count % nRecordsPerSplit == 0)){
+
+                    long currentPosition = reader.getPosition();
+                    FileSplit split = new FileSplit(listingFilePath, 
lastPosition,
+                            currentPosition - lastPosition, null);
+                    if(LOG.isDebugEnabled()){
+                        LOG.debug("Creating split: " + split + ", records in 
split: " + count);
+                    }
+
+                    splits.add(split);
+                    lastPosition = currentPosition;
+                    if(remains > 0){
+                        remains--;
+                    }
+                    count = 0L;
+                }
+            }
+
+            return splits;
+        } finally {
+            IOUtils.closeStream(reader);
+        }
+    }
+
+    public RecordReader<Text, CopyListingFileStatus> createRecordReader(
+            InputSplit split, TaskAttemptContext context)
+            throws IOException, InterruptedException {
+        return new SequenceFileRecordReader<Text, CopyListingFileStatus>();
+    }
+
+    private static Path getListingFilePath(Configuration configuration) {
+        String listingFilePathString =
+                configuration.get("distcp.listing.file.path", "");
+
+        assert !listingFilePathString.equals("")
+                : "Couldn't find listing file. Invalid input.";
+        return new Path(listingFilePathString);
+    }
+
+    private SequenceFile.Reader getListingFileReader(Configuration conf) {
+        Path listingFilePath = getListingFilePath(conf);
+
+        try {
+            FileSystem fs = listingFilePath.getFileSystem(conf);
+            if (!fs.exists(listingFilePath)) {

Review Comment:
   use `org.apache.hadoop.util.Preconditions`
   
   ```java
   checkState(fs.exists(...), "listing file doesn't exist at %s", 
listingFilePath);
   ```
   
     



##########
hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/mapred/TestUniformRecordInputFormat.java:
##########
@@ -0,0 +1,191 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.tools.mapred;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.JobID;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.lib.input.FileSplit;
+import org.apache.hadoop.mapreduce.task.JobContextImpl;
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.tools.CopyListing;
+import org.apache.hadoop.tools.CopyListingFileStatus;
+import org.apache.hadoop.tools.DistCpContext;
+import org.apache.hadoop.tools.DistCpOptions;
+import org.apache.hadoop.tools.StubContext;
+import org.apache.hadoop.tools.util.DistCpUtils;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
+
+
+public class TestUniformRecordInputFormat {

Review Comment:
   extends AbstractHadoopTest



##########
hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/mapred/TestUniformRecordInputFormat.java:
##########
@@ -0,0 +1,191 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.tools.mapred;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.JobID;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.lib.input.FileSplit;
+import org.apache.hadoop.mapreduce.task.JobContextImpl;
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.tools.CopyListing;
+import org.apache.hadoop.tools.CopyListingFileStatus;
+import org.apache.hadoop.tools.DistCpContext;
+import org.apache.hadoop.tools.DistCpOptions;
+import org.apache.hadoop.tools.StubContext;
+import org.apache.hadoop.tools.util.DistCpUtils;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
+
+
+public class TestUniformRecordInputFormat {
+  private static MiniDFSCluster cluster;
+  private static final int N_FILES = 20;
+  private static final int SIZEOF_EACH_FILE = 1024;
+  private static final Random random = new Random();
+  private static int totalFileSize = 0;
+
+  private static final Credentials CREDENTIALS = new Credentials();
+
+
+  @BeforeClass
+  public static void setup() throws Exception {

Review Comment:
   can you give this a name like "setupClass" as setup() is generally used for 
per-test-case setup
   



##########
hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DirCopyFilter.java:
##########
@@ -0,0 +1,54 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.tools;
+
+import java.io.IOException;
+import org.apache.hadoop.conf.Configuration;

Review Comment:
   afraid you need to use the same import ordering as everything else -its a 
real source of merge pain and we try to stay in control
   
   java*
   
   non-apache.*
   
   org.apache.*
   
   static *
   



##########
hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/mapred/TestUniformRecordInputFormat.java:
##########
@@ -0,0 +1,191 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.tools.mapred;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.JobID;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.lib.input.FileSplit;
+import org.apache.hadoop.mapreduce.task.JobContextImpl;
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.tools.CopyListing;
+import org.apache.hadoop.tools.CopyListingFileStatus;
+import org.apache.hadoop.tools.DistCpContext;
+import org.apache.hadoop.tools.DistCpOptions;
+import org.apache.hadoop.tools.StubContext;
+import org.apache.hadoop.tools.util.DistCpUtils;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
+
+
+public class TestUniformRecordInputFormat {
+  private static MiniDFSCluster cluster;
+  private static final int N_FILES = 20;
+  private static final int SIZEOF_EACH_FILE = 1024;
+  private static final Random random = new Random();
+  private static int totalFileSize = 0;
+
+  private static final Credentials CREDENTIALS = new Credentials();
+
+
+  @BeforeClass
+  public static void setup() throws Exception {
+    cluster = new MiniDFSCluster.Builder(new Configuration()).numDataNodes(1)
+                                          .format(true).build();
+    totalFileSize = 0;
+
+    for (int i=0; i<N_FILES; ++i)
+      totalFileSize += createFile("/tmp/source/" + String.valueOf(i), 
SIZEOF_EACH_FILE);
+  }
+
+  private static DistCpOptions getOptions(int nMaps) throws Exception {
+    Path sourcePath = new Path(cluster.getFileSystem().getUri().toString()
+                               + "/tmp/source");
+    Path targetPath = new Path(cluster.getFileSystem().getUri().toString()
+                               + "/tmp/target");
+
+    List<Path> sourceList = new ArrayList<Path>();
+    sourceList.add(sourcePath);
+    return new DistCpOptions.Builder(sourceList, targetPath)
+        .maxMaps(nMaps)
+        .build();
+  }
+
+  private static int createFile(String path, int fileSize) throws Exception {
+    FileSystem fileSystem = null;
+    DataOutputStream outputStream = null;
+    try {
+      fileSystem = cluster.getFileSystem();
+      outputStream = fileSystem.create(new Path(path), true, 0);
+      int size = (int) Math.ceil(fileSize + (1 - random.nextFloat()) * 
fileSize);
+      outputStream.write(new byte[size]);
+      return size;
+    }
+    finally {
+      IOUtils.cleanupWithLogger(null, fileSystem, outputStream);

Review Comment:
   don't close the filesystem; it is needed for the other tests



##########
hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/UniformRecordInputFormat.java:
##########
@@ -0,0 +1,139 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.tools.mapred;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.InputFormat;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.lib.input.FileSplit;
+import org.apache.hadoop.mapreduce.lib.input.SequenceFileRecordReader;
+import org.apache.hadoop.tools.CopyListingFileStatus;
+import org.apache.hadoop.tools.util.DistCpUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+public class UniformRecordInputFormat extends InputFormat<Text, 
CopyListingFileStatus> {
+    private static final Logger LOG = 
LoggerFactory.getLogger(UniformRecordInputFormat.class);
+
+    public List<InputSplit> getSplits(JobContext context) throws IOException, 
InterruptedException {
+        Configuration conf = context.getConfiguration();
+        int numSplits = getNumSplits(conf);
+        if (numSplits == 0) return new ArrayList();
+
+        return createSplits(conf, numSplits, getNumberOfRecords(conf));
+    }
+
+    private List<InputSplit> createSplits(Configuration configuration, int 
numSplits, long numRecords) throws IOException {
+        List<InputSplit> splits = new ArrayList(numSplits);
+        long nRecordsPerSplit = (long) Math.floor(numRecords * 1.0 / 
numSplits);
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("Average records per map: " + nRecordsPerSplit +
+                    ", Number of maps: " + numSplits + ", total records: " + 
numRecords);
+        }
+
+        Path listingFilePath = getListingFilePath(configuration);
+        CopyListingFileStatus srcFileStatus = new CopyListingFileStatus();
+        Text srcRelPath = new Text();
+        long lastPosition = 0L;
+        long count = 0L;
+        long remains = numRecords - nRecordsPerSplit * (long) numSplits;
+
+        SequenceFile.Reader reader = null;
+        try {
+            reader = getListingFileReader(configuration);
+            while (reader.next(srcRelPath, srcFileStatus)) {
+                count++;
+
+                if((remains > 0 && count % (nRecordsPerSplit + 1) == 0) ||
+                        (remains == 0 && count % nRecordsPerSplit == 0)){
+
+                    long currentPosition = reader.getPosition();
+                    FileSplit split = new FileSplit(listingFilePath, 
lastPosition,
+                            currentPosition - lastPosition, null);
+                    if(LOG.isDebugEnabled()){
+                        LOG.debug("Creating split: " + split + ", records in 
split: " + count);
+                    }
+
+                    splits.add(split);
+                    lastPosition = currentPosition;
+                    if(remains > 0){
+                        remains--;
+                    }
+                    count = 0L;
+                }
+            }
+
+            return splits;
+        } finally {
+            IOUtils.closeStream(reader);
+        }
+    }
+
+    public RecordReader<Text, CopyListingFileStatus> createRecordReader(
+            InputSplit split, TaskAttemptContext context)
+            throws IOException, InterruptedException {
+        return new SequenceFileRecordReader<Text, CopyListingFileStatus>();
+    }
+
+    private static Path getListingFilePath(Configuration configuration) {
+        String listingFilePathString =
+                configuration.get("distcp.listing.file.path", "");

Review Comment:
   must always reference constant strings for easse of maintenance



##########
hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DirCopyFilter.java:
##########
@@ -0,0 +1,54 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.tools;
+
+import java.io.IOException;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class DirCopyFilter extends FileStatusCopyFilter {
+    private static final Logger LOG = 
LoggerFactory.getLogger(DirCopyFilter.class);
+    Configuration conf;
+
+    public DirCopyFilter(Configuration conf) {
+        this.conf = conf;
+    }
+
+    @Override
+    public boolean shouldCopy(Path path) {
+        try {
+            FileSystem fs = path.getFileSystem(this.conf);
+            if (fs.getFileStatus(path).isDirectory()) {
+                return true;
+            }
+        } catch (IOException e) {
+            throw new RuntimeException(e);

Review Comment:
   RuntimeIOException



##########
hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/UniformRecordInputFormat.java:
##########
@@ -0,0 +1,139 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.tools.mapred;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.InputFormat;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.lib.input.FileSplit;
+import org.apache.hadoop.mapreduce.lib.input.SequenceFileRecordReader;
+import org.apache.hadoop.tools.CopyListingFileStatus;
+import org.apache.hadoop.tools.util.DistCpUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+public class UniformRecordInputFormat extends InputFormat<Text, 
CopyListingFileStatus> {
+    private static final Logger LOG = 
LoggerFactory.getLogger(UniformRecordInputFormat.class);
+
+    public List<InputSplit> getSplits(JobContext context) throws IOException, 
InterruptedException {
+        Configuration conf = context.getConfiguration();
+        int numSplits = getNumSplits(conf);
+        if (numSplits == 0) return new ArrayList();
+
+        return createSplits(conf, numSplits, getNumberOfRecords(conf));
+    }
+
+    private List<InputSplit> createSplits(Configuration configuration, int 
numSplits, long numRecords) throws IOException {
+        List<InputSplit> splits = new ArrayList(numSplits);
+        long nRecordsPerSplit = (long) Math.floor(numRecords * 1.0 / 
numSplits);
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("Average records per map: " + nRecordsPerSplit +
+                    ", Number of maps: " + numSplits + ", total records: " + 
numRecords);
+        }
+
+        Path listingFilePath = getListingFilePath(configuration);
+        CopyListingFileStatus srcFileStatus = new CopyListingFileStatus();
+        Text srcRelPath = new Text();
+        long lastPosition = 0L;
+        long count = 0L;
+        long remains = numRecords - nRecordsPerSplit * (long) numSplits;
+
+        SequenceFile.Reader reader = null;
+        try {
+            reader = getListingFileReader(configuration);
+            while (reader.next(srcRelPath, srcFileStatus)) {
+                count++;
+
+                if((remains > 0 && count % (nRecordsPerSplit + 1) == 0) ||
+                        (remains == 0 && count % nRecordsPerSplit == 0)){
+
+                    long currentPosition = reader.getPosition();
+                    FileSplit split = new FileSplit(listingFilePath, 
lastPosition,
+                            currentPosition - lastPosition, null);
+                    if(LOG.isDebugEnabled()){
+                        LOG.debug("Creating split: " + split + ", records in 
split: " + count);
+                    }
+
+                    splits.add(split);
+                    lastPosition = currentPosition;
+                    if(remains > 0){
+                        remains--;
+                    }
+                    count = 0L;
+                }
+            }
+
+            return splits;
+        } finally {
+            IOUtils.closeStream(reader);
+        }
+    }
+
+    public RecordReader<Text, CopyListingFileStatus> createRecordReader(
+            InputSplit split, TaskAttemptContext context)
+            throws IOException, InterruptedException {
+        return new SequenceFileRecordReader<Text, CopyListingFileStatus>();
+    }
+
+    private static Path getListingFilePath(Configuration configuration) {
+        String listingFilePathString =
+                configuration.get("distcp.listing.file.path", "");
+
+        assert !listingFilePathString.equals("")
+                : "Couldn't find listing file. Invalid input.";
+        return new Path(listingFilePathString);
+    }
+
+    private SequenceFile.Reader getListingFileReader(Configuration conf) {
+        Path listingFilePath = getListingFilePath(conf);
+
+        try {
+            FileSystem fs = listingFilePath.getFileSystem(conf);
+            if (!fs.exists(listingFilePath)) {
+                throw new IllegalArgumentException("Listing file doesn't exist 
at: "
+                        + listingFilePath);
+            }
+            return new SequenceFile.Reader(conf,
+                    SequenceFile.Reader.file(listingFilePath));
+        } catch (IOException exception) {
+            LOG.error("Couldn't find listing file at: " + listingFilePath, 
exception);

Review Comment:
   use slf4j "couldn't read {}", listingFilePAth" format
   
   using "read" over "find" as there could be other reasons for the failure 
(permissions etc)



##########
hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/mapred/TestUniformRecordInputFormat.java:
##########
@@ -0,0 +1,191 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.tools.mapred;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.JobID;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.lib.input.FileSplit;
+import org.apache.hadoop.mapreduce.task.JobContextImpl;
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.tools.CopyListing;
+import org.apache.hadoop.tools.CopyListingFileStatus;
+import org.apache.hadoop.tools.DistCpContext;
+import org.apache.hadoop.tools.DistCpOptions;
+import org.apache.hadoop.tools.StubContext;
+import org.apache.hadoop.tools.util.DistCpUtils;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
+
+
+public class TestUniformRecordInputFormat {
+  private static MiniDFSCluster cluster;
+  private static final int N_FILES = 20;
+  private static final int SIZEOF_EACH_FILE = 1024;
+  private static final Random random = new Random();
+  private static int totalFileSize = 0;
+
+  private static final Credentials CREDENTIALS = new Credentials();
+
+
+  @BeforeClass
+  public static void setup() throws Exception {
+    cluster = new MiniDFSCluster.Builder(new Configuration()).numDataNodes(1)
+                                          .format(true).build();
+    totalFileSize = 0;
+
+    for (int i=0; i<N_FILES; ++i)
+      totalFileSize += createFile("/tmp/source/" + String.valueOf(i), 
SIZEOF_EACH_FILE);
+  }
+
+  private static DistCpOptions getOptions(int nMaps) throws Exception {
+    Path sourcePath = new Path(cluster.getFileSystem().getUri().toString()
+                               + "/tmp/source");
+    Path targetPath = new Path(cluster.getFileSystem().getUri().toString()
+                               + "/tmp/target");
+
+    List<Path> sourceList = new ArrayList<Path>();
+    sourceList.add(sourcePath);
+    return new DistCpOptions.Builder(sourceList, targetPath)
+        .maxMaps(nMaps)
+        .build();
+  }
+
+  private static int createFile(String path, int fileSize) throws Exception {
+    FileSystem fileSystem = null;
+    DataOutputStream outputStream = null;
+    try {
+      fileSystem = cluster.getFileSystem();
+      outputStream = fileSystem.create(new Path(path), true, 0);
+      int size = (int) Math.ceil(fileSize + (1 - random.nextFloat()) * 
fileSize);
+      outputStream.write(new byte[size]);
+      return size;
+    }
+    finally {
+      IOUtils.cleanupWithLogger(null, fileSystem, outputStream);
+    }
+  }
+
+  @AfterClass
+  public static void tearDown() {
+    cluster.shutdown();
+  }
+
+  public void testGetSplits(int nMaps) throws Exception {
+    DistCpContext context = new DistCpContext(getOptions(nMaps));
+    Configuration configuration = new Configuration();
+    configuration.set("mapred.map.tasks", 
String.valueOf(context.getMaxMaps()));
+    Path listFile = new Path(cluster.getFileSystem().getUri().toString()
+        + "/tmp/testGetSplits_2/fileList.seq");
+    CopyListing.getCopyListing(configuration, CREDENTIALS, context)
+        .buildListing(listFile, context);
+
+    JobContext jobContext = new JobContextImpl(configuration, new JobID());
+    UniformRecordInputFormat uniformRecordInputFormat = new 
UniformRecordInputFormat();
+    List<InputSplit> splits
+            = uniformRecordInputFormat.getSplits(jobContext);
+
+    long totalRecords = DistCpUtils.getLong(configuration, 
"mapred.number.of.records");
+    long recordPerMap = totalRecords / nMaps;
+
+
+    checkSplits(listFile, splits);
+
+    int doubleCheckedTotalSize = 0;
+    for (int i=0; i < splits.size(); ++i) {
+      InputSplit split = splits.get(i);
+      int currentSplitSize = 0;
+      RecordReader<Text, CopyListingFileStatus> recordReader =
+        uniformRecordInputFormat.createRecordReader(split, null);
+      StubContext stubContext = new StubContext(jobContext.getConfiguration(),
+                                                recordReader, 0);
+      final TaskAttemptContext taskAttemptContext
+         = stubContext.getContext();
+      recordReader.initialize(split, taskAttemptContext);
+      int recordCnt = 0;
+      while (recordReader.nextKeyValue()) {
+        recordCnt++;
+        Path sourcePath = recordReader.getCurrentValue().getPath();
+        FileSystem fs = sourcePath.getFileSystem(configuration);
+        FileStatus fileStatus [] = fs.listStatus(sourcePath);
+        if (fileStatus.length > 1) {
+          continue;
+        }
+        currentSplitSize += fileStatus[0].getLen();
+      }
+
+      Assert.assertTrue(recordCnt == recordPerMap || recordCnt == 
(recordPerMap + 1));

Review Comment:
   use AssertJ.assertThat(). eg.
   
   ```java
   assertJ.assertThat(recordCnt)
    .decribedAs("record count")
    .isGreaterThanOrEqualTo(recordPerMap)
    .isLessThanOrEqualTo(recordPerMap +1)
   ```
   
   assertj gives wonderful error messages on assertion failures.
   



##########
hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/mapred/TestUniformRecordInputFormat.java:
##########
@@ -0,0 +1,191 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.tools.mapred;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.JobID;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.lib.input.FileSplit;
+import org.apache.hadoop.mapreduce.task.JobContextImpl;
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.tools.CopyListing;
+import org.apache.hadoop.tools.CopyListingFileStatus;
+import org.apache.hadoop.tools.DistCpContext;
+import org.apache.hadoop.tools.DistCpOptions;
+import org.apache.hadoop.tools.StubContext;
+import org.apache.hadoop.tools.util.DistCpUtils;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
+
+
+public class TestUniformRecordInputFormat {
+  private static MiniDFSCluster cluster;
+  private static final int N_FILES = 20;
+  private static final int SIZEOF_EACH_FILE = 1024;
+  private static final Random random = new Random();
+  private static int totalFileSize = 0;
+
+  private static final Credentials CREDENTIALS = new Credentials();
+
+
+  @BeforeClass
+  public static void setup() throws Exception {
+    cluster = new MiniDFSCluster.Builder(new Configuration()).numDataNodes(1)
+                                          .format(true).build();
+    totalFileSize = 0;
+
+    for (int i=0; i<N_FILES; ++i)
+      totalFileSize += createFile("/tmp/source/" + String.valueOf(i), 
SIZEOF_EACH_FILE);
+  }
+
+  private static DistCpOptions getOptions(int nMaps) throws Exception {
+    Path sourcePath = new Path(cluster.getFileSystem().getUri().toString()
+                               + "/tmp/source");
+    Path targetPath = new Path(cluster.getFileSystem().getUri().toString()
+                               + "/tmp/target");
+
+    List<Path> sourceList = new ArrayList<Path>();
+    sourceList.add(sourcePath);
+    return new DistCpOptions.Builder(sourceList, targetPath)
+        .maxMaps(nMaps)
+        .build();
+  }
+
+  private static int createFile(String path, int fileSize) throws Exception {
+    FileSystem fileSystem = null;
+    DataOutputStream outputStream = null;
+    try {
+      fileSystem = cluster.getFileSystem();
+      outputStream = fileSystem.create(new Path(path), true, 0);
+      int size = (int) Math.ceil(fileSize + (1 - random.nextFloat()) * 
fileSize);
+      outputStream.write(new byte[size]);
+      return size;
+    }
+    finally {
+      IOUtils.cleanupWithLogger(null, fileSystem, outputStream);
+    }
+  }
+
+  @AfterClass
+  public static void tearDown() {
+    cluster.shutdown();
+  }
+
+  public void testGetSplits(int nMaps) throws Exception {
+    DistCpContext context = new DistCpContext(getOptions(nMaps));
+    Configuration configuration = new Configuration();
+    configuration.set("mapred.map.tasks", 
String.valueOf(context.getMaxMaps()));
+    Path listFile = new Path(cluster.getFileSystem().getUri().toString()
+        + "/tmp/testGetSplits_2/fileList.seq");
+    CopyListing.getCopyListing(configuration, CREDENTIALS, context)
+        .buildListing(listFile, context);
+
+    JobContext jobContext = new JobContextImpl(configuration, new JobID());
+    UniformRecordInputFormat uniformRecordInputFormat = new 
UniformRecordInputFormat();
+    List<InputSplit> splits
+            = uniformRecordInputFormat.getSplits(jobContext);
+
+    long totalRecords = DistCpUtils.getLong(configuration, 
"mapred.number.of.records");
+    long recordPerMap = totalRecords / nMaps;
+
+
+    checkSplits(listFile, splits);
+
+    int doubleCheckedTotalSize = 0;
+    for (int i=0; i < splits.size(); ++i) {
+      InputSplit split = splits.get(i);
+      int currentSplitSize = 0;
+      RecordReader<Text, CopyListingFileStatus> recordReader =
+        uniformRecordInputFormat.createRecordReader(split, null);
+      StubContext stubContext = new StubContext(jobContext.getConfiguration(),
+                                                recordReader, 0);
+      final TaskAttemptContext taskAttemptContext
+         = stubContext.getContext();
+      recordReader.initialize(split, taskAttemptContext);
+      int recordCnt = 0;
+      while (recordReader.nextKeyValue()) {
+        recordCnt++;
+        Path sourcePath = recordReader.getCurrentValue().getPath();
+        FileSystem fs = sourcePath.getFileSystem(configuration);
+        FileStatus fileStatus [] = fs.listStatus(sourcePath);
+        if (fileStatus.length > 1) {
+          continue;
+        }
+        currentSplitSize += fileStatus[0].getLen();
+      }
+
+      Assert.assertTrue(recordCnt == recordPerMap || recordCnt == 
(recordPerMap + 1));
+      doubleCheckedTotalSize += currentSplitSize;
+    }
+
+    Assert.assertEquals(totalFileSize, doubleCheckedTotalSize);

Review Comment:
   again, assertJ here and everywhere below





> hadoop distcp needs support to filter by file/directory attribute
> -----------------------------------------------------------------
>
>                 Key: HADOOP-18891
>                 URL: https://issues.apache.org/jira/browse/HADOOP-18891
>             Project: Hadoop Common
>          Issue Type: New Feature
>          Components: tools/distcp
>    Affects Versions: 3.3.6
>         Environment: hadoop3.3.6
>            Reporter: Authur Wang
>            Priority: Major
>              Labels: pull-request-available
>
> In some circumstances, we need to filter file/directory by file/directroy. 
> For example, we need to filter out them by file modified time, isDir attrs, 
> etc.
> So, *should we introduce a new method  public boolean 
> shouldCopy(CopyListingFileStatus fileStatus)* ? 
> by this approach, we can introduce a more fluent way to do things than  
> public abstract boolean shouldCopy(Path path).



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to