[
https://issues.apache.org/jira/browse/HADOOP-18891?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17764615#comment-17764615
]
ASF GitHub Bot commented on HADOOP-18891:
-----------------------------------------
steveloughran commented on code in PR #6058:
URL: https://github.com/apache/hadoop/pull/6058#discussion_r1324210411
##########
hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/CopyFilter.java:
##########
@@ -46,6 +46,14 @@ public void initialize() {}
*/
public abstract boolean shouldCopy(Path path);
+ public boolean shouldCopy(CopyListingFileStatus fileStatus){
+ return shouldCopy(fileStatus.getPath());
+ }
+
+ public boolean supportFileStatus(){
Review Comment:
what's this for? add javadoc to explain.
##########
hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DirCopyFilter.java:
##########
@@ -0,0 +1,54 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.tools;
+
+import java.io.IOException;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class DirCopyFilter extends FileStatusCopyFilter {
Review Comment:
needs javadocs, and know that our indentation policy is "two spaces"
##########
hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/CopyFilter.java:
##########
@@ -46,6 +46,14 @@ public void initialize() {}
*/
public abstract boolean shouldCopy(Path path);
+ public boolean shouldCopy(CopyListingFileStatus fileStatus){
Review Comment:
needs javadoc
##########
hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/UniformRecordInputFormat.java:
##########
@@ -0,0 +1,139 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.tools.mapred;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.InputFormat;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.lib.input.FileSplit;
+import org.apache.hadoop.mapreduce.lib.input.SequenceFileRecordReader;
+import org.apache.hadoop.tools.CopyListingFileStatus;
+import org.apache.hadoop.tools.util.DistCpUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+public class UniformRecordInputFormat extends InputFormat<Text,
CopyListingFileStatus> {
+ private static final Logger LOG =
LoggerFactory.getLogger(UniformRecordInputFormat.class);
+
+ public List<InputSplit> getSplits(JobContext context) throws IOException,
InterruptedException {
+ Configuration conf = context.getConfiguration();
+ int numSplits = getNumSplits(conf);
+ if (numSplits == 0) return new ArrayList();
+
+ return createSplits(conf, numSplits, getNumberOfRecords(conf));
+ }
+
+ private List<InputSplit> createSplits(Configuration configuration, int
numSplits, long numRecords) throws IOException {
+ List<InputSplit> splits = new ArrayList(numSplits);
+ long nRecordsPerSplit = (long) Math.floor(numRecords * 1.0 /
numSplits);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Average records per map: " + nRecordsPerSplit +
+ ", Number of maps: " + numSplits + ", total records: " +
numRecords);
+ }
+
+ Path listingFilePath = getListingFilePath(configuration);
+ CopyListingFileStatus srcFileStatus = new CopyListingFileStatus();
+ Text srcRelPath = new Text();
+ long lastPosition = 0L;
+ long count = 0L;
+ long remains = numRecords - nRecordsPerSplit * (long) numSplits;
+
+ SequenceFile.Reader reader = null;
+ try {
+ reader = getListingFileReader(configuration);
+ while (reader.next(srcRelPath, srcFileStatus)) {
+ count++;
+
+ if((remains > 0 && count % (nRecordsPerSplit + 1) == 0) ||
+ (remains == 0 && count % nRecordsPerSplit == 0)){
+
+ long currentPosition = reader.getPosition();
+ FileSplit split = new FileSplit(listingFilePath,
lastPosition,
+ currentPosition - lastPosition, null);
+ if(LOG.isDebugEnabled()){
+ LOG.debug("Creating split: " + split + ", records in
split: " + count);
+ }
+
+ splits.add(split);
+ lastPosition = currentPosition;
+ if(remains > 0){
+ remains--;
+ }
+ count = 0L;
+ }
+ }
+
+ return splits;
+ } finally {
+ IOUtils.closeStream(reader);
+ }
+ }
+
+ public RecordReader<Text, CopyListingFileStatus> createRecordReader(
+ InputSplit split, TaskAttemptContext context)
+ throws IOException, InterruptedException {
+ return new SequenceFileRecordReader<Text, CopyListingFileStatus>();
+ }
+
+ private static Path getListingFilePath(Configuration configuration) {
+ String listingFilePathString =
+ configuration.get("distcp.listing.file.path", "");
+
+ assert !listingFilePathString.equals("")
+ : "Couldn't find listing file. Invalid input.";
+ return new Path(listingFilePathString);
+ }
+
+ private SequenceFile.Reader getListingFileReader(Configuration conf) {
+ Path listingFilePath = getListingFilePath(conf);
+
+ try {
+ FileSystem fs = listingFilePath.getFileSystem(conf);
+ if (!fs.exists(listingFilePath)) {
+ throw new IllegalArgumentException("Listing file doesn't exist
at: "
+ + listingFilePath);
+ }
+ return new SequenceFile.Reader(conf,
+ SequenceFile.Reader.file(listingFilePath));
+ } catch (IOException exception) {
+ LOG.error("Couldn't find listing file at: " + listingFilePath,
exception);
+ throw new IllegalArgumentException("Couldn't find listing-file at:
"
Review Comment:
make this an `IllegalStateException` for consistency with the checkState.
we will need tests to simulate these failures
##########
hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/mapred/TestUniformRecordInputFormat.java:
##########
@@ -0,0 +1,191 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.tools.mapred;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.JobID;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.lib.input.FileSplit;
+import org.apache.hadoop.mapreduce.task.JobContextImpl;
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.tools.CopyListing;
+import org.apache.hadoop.tools.CopyListingFileStatus;
+import org.apache.hadoop.tools.DistCpContext;
+import org.apache.hadoop.tools.DistCpOptions;
+import org.apache.hadoop.tools.StubContext;
+import org.apache.hadoop.tools.util.DistCpUtils;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
+
+
+public class TestUniformRecordInputFormat {
+ private static MiniDFSCluster cluster;
+ private static final int N_FILES = 20;
+ private static final int SIZEOF_EACH_FILE = 1024;
+ private static final Random random = new Random();
+ private static int totalFileSize = 0;
+
+ private static final Credentials CREDENTIALS = new Credentials();
+
+
+ @BeforeClass
+ public static void setup() throws Exception {
+ cluster = new MiniDFSCluster.Builder(new Configuration()).numDataNodes(1)
+ .format(true).build();
+ totalFileSize = 0;
+
+ for (int i=0; i<N_FILES; ++i)
+ totalFileSize += createFile("/tmp/source/" + String.valueOf(i),
SIZEOF_EACH_FILE);
+ }
+
+ private static DistCpOptions getOptions(int nMaps) throws Exception {
+ Path sourcePath = new Path(cluster.getFileSystem().getUri().toString()
+ + "/tmp/source");
+ Path targetPath = new Path(cluster.getFileSystem().getUri().toString()
+ + "/tmp/target");
+
+ List<Path> sourceList = new ArrayList<Path>();
+ sourceList.add(sourcePath);
+ return new DistCpOptions.Builder(sourceList, targetPath)
+ .maxMaps(nMaps)
+ .build();
+ }
+
+ private static int createFile(String path, int fileSize) throws Exception {
+ FileSystem fileSystem = null;
+ DataOutputStream outputStream = null;
+ try {
+ fileSystem = cluster.getFileSystem();
+ outputStream = fileSystem.create(new Path(path), true, 0);
+ int size = (int) Math.ceil(fileSize + (1 - random.nextFloat()) *
fileSize);
+ outputStream.write(new byte[size]);
+ return size;
+ }
+ finally {
+ IOUtils.cleanupWithLogger(null, fileSystem, outputStream);
+ }
+ }
+
+ @AfterClass
+ public static void tearDown() {
Review Comment:
1. rename tearDownClass(); move up to just below the setup method
2. wrap the shutdown with `if (cluster != null) {...}`
##########
hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/mapred/TestUniformRecordInputFormat.java:
##########
@@ -0,0 +1,191 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.tools.mapred;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.JobID;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.lib.input.FileSplit;
+import org.apache.hadoop.mapreduce.task.JobContextImpl;
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.tools.CopyListing;
+import org.apache.hadoop.tools.CopyListingFileStatus;
+import org.apache.hadoop.tools.DistCpContext;
+import org.apache.hadoop.tools.DistCpOptions;
+import org.apache.hadoop.tools.StubContext;
+import org.apache.hadoop.tools.util.DistCpUtils;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
+
+
+public class TestUniformRecordInputFormat {
+ private static MiniDFSCluster cluster;
+ private static final int N_FILES = 20;
+ private static final int SIZEOF_EACH_FILE = 1024;
+ private static final Random random = new Random();
+ private static int totalFileSize = 0;
+
+ private static final Credentials CREDENTIALS = new Credentials();
+
+
+ @BeforeClass
+ public static void setup() throws Exception {
+ cluster = new MiniDFSCluster.Builder(new Configuration()).numDataNodes(1)
+ .format(true).build();
+ totalFileSize = 0;
+
+ for (int i=0; i<N_FILES; ++i)
+ totalFileSize += createFile("/tmp/source/" + String.valueOf(i),
SIZEOF_EACH_FILE);
+ }
+
+ private static DistCpOptions getOptions(int nMaps) throws Exception {
+ Path sourcePath = new Path(cluster.getFileSystem().getUri().toString()
+ + "/tmp/source");
+ Path targetPath = new Path(cluster.getFileSystem().getUri().toString()
+ + "/tmp/target");
+
+ List<Path> sourceList = new ArrayList<Path>();
+ sourceList.add(sourcePath);
+ return new DistCpOptions.Builder(sourceList, targetPath)
+ .maxMaps(nMaps)
+ .build();
+ }
+
+ private static int createFile(String path, int fileSize) throws Exception {
+ FileSystem fileSystem = null;
+ DataOutputStream outputStream = null;
+ try {
+ fileSystem = cluster.getFileSystem();
+ outputStream = fileSystem.create(new Path(path), true, 0);
+ int size = (int) Math.ceil(fileSize + (1 - random.nextFloat()) *
fileSize);
+ outputStream.write(new byte[size]);
+ return size;
+ }
+ finally {
+ IOUtils.cleanupWithLogger(null, fileSystem, outputStream);
+ }
+ }
+
+ @AfterClass
+ public static void tearDown() {
+ cluster.shutdown();
+ }
+
+ public void testGetSplits(int nMaps) throws Exception {
+ DistCpContext context = new DistCpContext(getOptions(nMaps));
+ Configuration configuration = new Configuration();
+ configuration.set("mapred.map.tasks",
String.valueOf(context.getMaxMaps()));
+ Path listFile = new Path(cluster.getFileSystem().getUri().toString()
+ + "/tmp/testGetSplits_2/fileList.seq");
+ CopyListing.getCopyListing(configuration, CREDENTIALS, context)
+ .buildListing(listFile, context);
+
+ JobContext jobContext = new JobContextImpl(configuration, new JobID());
+ UniformRecordInputFormat uniformRecordInputFormat = new
UniformRecordInputFormat();
+ List<InputSplit> splits
+ = uniformRecordInputFormat.getSplits(jobContext);
+
+ long totalRecords = DistCpUtils.getLong(configuration,
"mapred.number.of.records");
+ long recordPerMap = totalRecords / nMaps;
+
+
+ checkSplits(listFile, splits);
+
+ int doubleCheckedTotalSize = 0;
+ for (int i=0; i < splits.size(); ++i) {
+ InputSplit split = splits.get(i);
+ int currentSplitSize = 0;
+ RecordReader<Text, CopyListingFileStatus> recordReader =
+ uniformRecordInputFormat.createRecordReader(split, null);
+ StubContext stubContext = new StubContext(jobContext.getConfiguration(),
+ recordReader, 0);
+ final TaskAttemptContext taskAttemptContext
+ = stubContext.getContext();
+ recordReader.initialize(split, taskAttemptContext);
+ int recordCnt = 0;
+ while (recordReader.nextKeyValue()) {
+ recordCnt++;
+ Path sourcePath = recordReader.getCurrentValue().getPath();
+ FileSystem fs = sourcePath.getFileSystem(configuration);
+ FileStatus fileStatus [] = fs.listStatus(sourcePath);
+ if (fileStatus.length > 1) {
+ continue;
Review Comment:
add a comment to explain
##########
hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/mapred/TestUniformRecordInputFormat.java:
##########
@@ -0,0 +1,191 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.tools.mapred;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.JobID;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.lib.input.FileSplit;
+import org.apache.hadoop.mapreduce.task.JobContextImpl;
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.tools.CopyListing;
+import org.apache.hadoop.tools.CopyListingFileStatus;
+import org.apache.hadoop.tools.DistCpContext;
+import org.apache.hadoop.tools.DistCpOptions;
+import org.apache.hadoop.tools.StubContext;
+import org.apache.hadoop.tools.util.DistCpUtils;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
+
+
+public class TestUniformRecordInputFormat {
+ private static MiniDFSCluster cluster;
+ private static final int N_FILES = 20;
+ private static final int SIZEOF_EACH_FILE = 1024;
+ private static final Random random = new Random();
+ private static int totalFileSize = 0;
+
+ private static final Credentials CREDENTIALS = new Credentials();
+
+
+ @BeforeClass
+ public static void setup() throws Exception {
+ cluster = new MiniDFSCluster.Builder(new Configuration()).numDataNodes(1)
+ .format(true).build();
+ totalFileSize = 0;
+
+ for (int i=0; i<N_FILES; ++i)
+ totalFileSize += createFile("/tmp/source/" + String.valueOf(i),
SIZEOF_EACH_FILE);
+ }
+
+ private static DistCpOptions getOptions(int nMaps) throws Exception {
+ Path sourcePath = new Path(cluster.getFileSystem().getUri().toString()
+ + "/tmp/source");
+ Path targetPath = new Path(cluster.getFileSystem().getUri().toString()
+ + "/tmp/target");
+
+ List<Path> sourceList = new ArrayList<Path>();
+ sourceList.add(sourcePath);
+ return new DistCpOptions.Builder(sourceList, targetPath)
+ .maxMaps(nMaps)
+ .build();
+ }
+
+ private static int createFile(String path, int fileSize) throws Exception {
+ FileSystem fileSystem = null;
+ DataOutputStream outputStream = null;
+ try {
+ fileSystem = cluster.getFileSystem();
+ outputStream = fileSystem.create(new Path(path), true, 0);
+ int size = (int) Math.ceil(fileSize + (1 - random.nextFloat()) *
fileSize);
+ outputStream.write(new byte[size]);
+ return size;
+ }
+ finally {
+ IOUtils.cleanupWithLogger(null, fileSystem, outputStream);
+ }
+ }
+
+ @AfterClass
+ public static void tearDown() {
+ cluster.shutdown();
+ }
+
+ public void testGetSplits(int nMaps) throws Exception {
+ DistCpContext context = new DistCpContext(getOptions(nMaps));
+ Configuration configuration = new Configuration();
+ configuration.set("mapred.map.tasks",
String.valueOf(context.getMaxMaps()));
+ Path listFile = new Path(cluster.getFileSystem().getUri().toString()
+ + "/tmp/testGetSplits_2/fileList.seq");
+ CopyListing.getCopyListing(configuration, CREDENTIALS, context)
+ .buildListing(listFile, context);
+
+ JobContext jobContext = new JobContextImpl(configuration, new JobID());
+ UniformRecordInputFormat uniformRecordInputFormat = new
UniformRecordInputFormat();
+ List<InputSplit> splits
+ = uniformRecordInputFormat.getSplits(jobContext);
+
+ long totalRecords = DistCpUtils.getLong(configuration,
"mapred.number.of.records");
+ long recordPerMap = totalRecords / nMaps;
+
+
+ checkSplits(listFile, splits);
+
+ int doubleCheckedTotalSize = 0;
+ for (int i=0; i < splits.size(); ++i) {
+ InputSplit split = splits.get(i);
+ int currentSplitSize = 0;
+ RecordReader<Text, CopyListingFileStatus> recordReader =
+ uniformRecordInputFormat.createRecordReader(split, null);
+ StubContext stubContext = new StubContext(jobContext.getConfiguration(),
+ recordReader, 0);
+ final TaskAttemptContext taskAttemptContext
+ = stubContext.getContext();
+ recordReader.initialize(split, taskAttemptContext);
+ int recordCnt = 0;
+ while (recordReader.nextKeyValue()) {
+ recordCnt++;
+ Path sourcePath = recordReader.getCurrentValue().getPath();
+ FileSystem fs = sourcePath.getFileSystem(configuration);
+ FileStatus fileStatus [] = fs.listStatus(sourcePath);
+ if (fileStatus.length > 1) {
+ continue;
+ }
+ currentSplitSize += fileStatus[0].getLen();
+ }
+
+ Assert.assertTrue(recordCnt == recordPerMap || recordCnt ==
(recordPerMap + 1));
+ doubleCheckedTotalSize += currentSplitSize;
+ }
+
+ Assert.assertEquals(totalFileSize, doubleCheckedTotalSize);
+ }
+
+ private void checkSplits(Path listFile, List<InputSplit> splits) throws
IOException {
+ long lastEnd = 0;
+
+ //Verify if each split's start is matching with the previous end and
+ //we are not missing anything
+ for (InputSplit split : splits) {
+ FileSplit fileSplit = (FileSplit) split;
+ long start = fileSplit.getStart();
+ Assert.assertEquals(lastEnd, start);
+ lastEnd = start + fileSplit.getLength();
+ }
+
+ //Verify there is nothing more to read from the input file
+ SequenceFile.Reader reader
Review Comment:
or use a try-with-resources for the automatic close
##########
hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/UniformRecordInputFormat.java:
##########
@@ -0,0 +1,139 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.tools.mapred;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.InputFormat;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.lib.input.FileSplit;
+import org.apache.hadoop.mapreduce.lib.input.SequenceFileRecordReader;
+import org.apache.hadoop.tools.CopyListingFileStatus;
+import org.apache.hadoop.tools.util.DistCpUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+public class UniformRecordInputFormat extends InputFormat<Text,
CopyListingFileStatus> {
+ private static final Logger LOG =
LoggerFactory.getLogger(UniformRecordInputFormat.class);
+
+ public List<InputSplit> getSplits(JobContext context) throws IOException,
InterruptedException {
+ Configuration conf = context.getConfiguration();
+ int numSplits = getNumSplits(conf);
+ if (numSplits == 0) return new ArrayList();
+
+ return createSplits(conf, numSplits, getNumberOfRecords(conf));
+ }
+
+ private List<InputSplit> createSplits(Configuration configuration, int
numSplits, long numRecords) throws IOException {
+ List<InputSplit> splits = new ArrayList(numSplits);
+ long nRecordsPerSplit = (long) Math.floor(numRecords * 1.0 /
numSplits);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Average records per map: " + nRecordsPerSplit +
+ ", Number of maps: " + numSplits + ", total records: " +
numRecords);
+ }
+
+ Path listingFilePath = getListingFilePath(configuration);
+ CopyListingFileStatus srcFileStatus = new CopyListingFileStatus();
+ Text srcRelPath = new Text();
+ long lastPosition = 0L;
+ long count = 0L;
+ long remains = numRecords - nRecordsPerSplit * (long) numSplits;
+
+ SequenceFile.Reader reader = null;
+ try {
+ reader = getListingFileReader(configuration);
+ while (reader.next(srcRelPath, srcFileStatus)) {
+ count++;
+
+ if((remains > 0 && count % (nRecordsPerSplit + 1) == 0) ||
+ (remains == 0 && count % nRecordsPerSplit == 0)){
+
+ long currentPosition = reader.getPosition();
+ FileSplit split = new FileSplit(listingFilePath,
lastPosition,
+ currentPosition - lastPosition, null);
+ if(LOG.isDebugEnabled()){
+ LOG.debug("Creating split: " + split + ", records in
split: " + count);
+ }
+
+ splits.add(split);
+ lastPosition = currentPosition;
+ if(remains > 0){
+ remains--;
+ }
+ count = 0L;
+ }
+ }
+
+ return splits;
+ } finally {
+ IOUtils.closeStream(reader);
+ }
+ }
+
+ public RecordReader<Text, CopyListingFileStatus> createRecordReader(
+ InputSplit split, TaskAttemptContext context)
+ throws IOException, InterruptedException {
+ return new SequenceFileRecordReader<Text, CopyListingFileStatus>();
+ }
+
+ private static Path getListingFilePath(Configuration configuration) {
+ String listingFilePathString =
+ configuration.get("distcp.listing.file.path", "");
+
+ assert !listingFilePathString.equals("")
+ : "Couldn't find listing file. Invalid input.";
+ return new Path(listingFilePathString);
+ }
+
+ private SequenceFile.Reader getListingFileReader(Configuration conf) {
+ Path listingFilePath = getListingFilePath(conf);
+
+ try {
+ FileSystem fs = listingFilePath.getFileSystem(conf);
+ if (!fs.exists(listingFilePath)) {
Review Comment:
use `org.apache.hadoop.util.Preconditions`
```java
checkState(fs.exists(...), "listing file doesn't exist at %s",
listingFilePath);
```
##########
hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/mapred/TestUniformRecordInputFormat.java:
##########
@@ -0,0 +1,191 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.tools.mapred;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.JobID;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.lib.input.FileSplit;
+import org.apache.hadoop.mapreduce.task.JobContextImpl;
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.tools.CopyListing;
+import org.apache.hadoop.tools.CopyListingFileStatus;
+import org.apache.hadoop.tools.DistCpContext;
+import org.apache.hadoop.tools.DistCpOptions;
+import org.apache.hadoop.tools.StubContext;
+import org.apache.hadoop.tools.util.DistCpUtils;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
+
+
+public class TestUniformRecordInputFormat {
Review Comment:
extends AbstractHadoopTest
##########
hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/mapred/TestUniformRecordInputFormat.java:
##########
@@ -0,0 +1,191 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.tools.mapred;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.JobID;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.lib.input.FileSplit;
+import org.apache.hadoop.mapreduce.task.JobContextImpl;
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.tools.CopyListing;
+import org.apache.hadoop.tools.CopyListingFileStatus;
+import org.apache.hadoop.tools.DistCpContext;
+import org.apache.hadoop.tools.DistCpOptions;
+import org.apache.hadoop.tools.StubContext;
+import org.apache.hadoop.tools.util.DistCpUtils;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
+
+
+public class TestUniformRecordInputFormat {
+ private static MiniDFSCluster cluster;
+ private static final int N_FILES = 20;
+ private static final int SIZEOF_EACH_FILE = 1024;
+ private static final Random random = new Random();
+ private static int totalFileSize = 0;
+
+ private static final Credentials CREDENTIALS = new Credentials();
+
+
+ @BeforeClass
+ public static void setup() throws Exception {
Review Comment:
can you give this a name like "setupClass" as setup() is generally used for
per-test-case setup
##########
hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DirCopyFilter.java:
##########
@@ -0,0 +1,54 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.tools;
+
+import java.io.IOException;
+import org.apache.hadoop.conf.Configuration;
Review Comment:
afraid you need to use the same import ordering as everything else -its a
real source of merge pain and we try to stay in control
java*
non-apache.*
org.apache.*
static *
##########
hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/mapred/TestUniformRecordInputFormat.java:
##########
@@ -0,0 +1,191 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.tools.mapred;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.JobID;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.lib.input.FileSplit;
+import org.apache.hadoop.mapreduce.task.JobContextImpl;
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.tools.CopyListing;
+import org.apache.hadoop.tools.CopyListingFileStatus;
+import org.apache.hadoop.tools.DistCpContext;
+import org.apache.hadoop.tools.DistCpOptions;
+import org.apache.hadoop.tools.StubContext;
+import org.apache.hadoop.tools.util.DistCpUtils;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
+
+
+public class TestUniformRecordInputFormat {
+ private static MiniDFSCluster cluster;
+ private static final int N_FILES = 20;
+ private static final int SIZEOF_EACH_FILE = 1024;
+ private static final Random random = new Random();
+ private static int totalFileSize = 0;
+
+ private static final Credentials CREDENTIALS = new Credentials();
+
+
+ @BeforeClass
+ public static void setup() throws Exception {
+ cluster = new MiniDFSCluster.Builder(new Configuration()).numDataNodes(1)
+ .format(true).build();
+ totalFileSize = 0;
+
+ for (int i=0; i<N_FILES; ++i)
+ totalFileSize += createFile("/tmp/source/" + String.valueOf(i),
SIZEOF_EACH_FILE);
+ }
+
+ private static DistCpOptions getOptions(int nMaps) throws Exception {
+ Path sourcePath = new Path(cluster.getFileSystem().getUri().toString()
+ + "/tmp/source");
+ Path targetPath = new Path(cluster.getFileSystem().getUri().toString()
+ + "/tmp/target");
+
+ List<Path> sourceList = new ArrayList<Path>();
+ sourceList.add(sourcePath);
+ return new DistCpOptions.Builder(sourceList, targetPath)
+ .maxMaps(nMaps)
+ .build();
+ }
+
+ private static int createFile(String path, int fileSize) throws Exception {
+ FileSystem fileSystem = null;
+ DataOutputStream outputStream = null;
+ try {
+ fileSystem = cluster.getFileSystem();
+ outputStream = fileSystem.create(new Path(path), true, 0);
+ int size = (int) Math.ceil(fileSize + (1 - random.nextFloat()) *
fileSize);
+ outputStream.write(new byte[size]);
+ return size;
+ }
+ finally {
+ IOUtils.cleanupWithLogger(null, fileSystem, outputStream);
Review Comment:
don't close the filesystem; it is needed for the other tests
##########
hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/UniformRecordInputFormat.java:
##########
@@ -0,0 +1,139 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.tools.mapred;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.InputFormat;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.lib.input.FileSplit;
+import org.apache.hadoop.mapreduce.lib.input.SequenceFileRecordReader;
+import org.apache.hadoop.tools.CopyListingFileStatus;
+import org.apache.hadoop.tools.util.DistCpUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+public class UniformRecordInputFormat extends InputFormat<Text,
CopyListingFileStatus> {
+ private static final Logger LOG =
LoggerFactory.getLogger(UniformRecordInputFormat.class);
+
+ public List<InputSplit> getSplits(JobContext context) throws IOException,
InterruptedException {
+ Configuration conf = context.getConfiguration();
+ int numSplits = getNumSplits(conf);
+ if (numSplits == 0) return new ArrayList();
+
+ return createSplits(conf, numSplits, getNumberOfRecords(conf));
+ }
+
+ private List<InputSplit> createSplits(Configuration configuration, int
numSplits, long numRecords) throws IOException {
+ List<InputSplit> splits = new ArrayList(numSplits);
+ long nRecordsPerSplit = (long) Math.floor(numRecords * 1.0 /
numSplits);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Average records per map: " + nRecordsPerSplit +
+ ", Number of maps: " + numSplits + ", total records: " +
numRecords);
+ }
+
+ Path listingFilePath = getListingFilePath(configuration);
+ CopyListingFileStatus srcFileStatus = new CopyListingFileStatus();
+ Text srcRelPath = new Text();
+ long lastPosition = 0L;
+ long count = 0L;
+ long remains = numRecords - nRecordsPerSplit * (long) numSplits;
+
+ SequenceFile.Reader reader = null;
+ try {
+ reader = getListingFileReader(configuration);
+ while (reader.next(srcRelPath, srcFileStatus)) {
+ count++;
+
+ if((remains > 0 && count % (nRecordsPerSplit + 1) == 0) ||
+ (remains == 0 && count % nRecordsPerSplit == 0)){
+
+ long currentPosition = reader.getPosition();
+ FileSplit split = new FileSplit(listingFilePath,
lastPosition,
+ currentPosition - lastPosition, null);
+ if(LOG.isDebugEnabled()){
+ LOG.debug("Creating split: " + split + ", records in
split: " + count);
+ }
+
+ splits.add(split);
+ lastPosition = currentPosition;
+ if(remains > 0){
+ remains--;
+ }
+ count = 0L;
+ }
+ }
+
+ return splits;
+ } finally {
+ IOUtils.closeStream(reader);
+ }
+ }
+
+ public RecordReader<Text, CopyListingFileStatus> createRecordReader(
+ InputSplit split, TaskAttemptContext context)
+ throws IOException, InterruptedException {
+ return new SequenceFileRecordReader<Text, CopyListingFileStatus>();
+ }
+
+ private static Path getListingFilePath(Configuration configuration) {
+ String listingFilePathString =
+ configuration.get("distcp.listing.file.path", "");
Review Comment:
must always reference constant strings for easse of maintenance
##########
hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DirCopyFilter.java:
##########
@@ -0,0 +1,54 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.tools;
+
+import java.io.IOException;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class DirCopyFilter extends FileStatusCopyFilter {
+ private static final Logger LOG =
LoggerFactory.getLogger(DirCopyFilter.class);
+ Configuration conf;
+
+ public DirCopyFilter(Configuration conf) {
+ this.conf = conf;
+ }
+
+ @Override
+ public boolean shouldCopy(Path path) {
+ try {
+ FileSystem fs = path.getFileSystem(this.conf);
+ if (fs.getFileStatus(path).isDirectory()) {
+ return true;
+ }
+ } catch (IOException e) {
+ throw new RuntimeException(e);
Review Comment:
RuntimeIOException
##########
hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/UniformRecordInputFormat.java:
##########
@@ -0,0 +1,139 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.tools.mapred;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.InputFormat;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.lib.input.FileSplit;
+import org.apache.hadoop.mapreduce.lib.input.SequenceFileRecordReader;
+import org.apache.hadoop.tools.CopyListingFileStatus;
+import org.apache.hadoop.tools.util.DistCpUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+public class UniformRecordInputFormat extends InputFormat<Text,
CopyListingFileStatus> {
+ private static final Logger LOG =
LoggerFactory.getLogger(UniformRecordInputFormat.class);
+
+ public List<InputSplit> getSplits(JobContext context) throws IOException,
InterruptedException {
+ Configuration conf = context.getConfiguration();
+ int numSplits = getNumSplits(conf);
+ if (numSplits == 0) return new ArrayList();
+
+ return createSplits(conf, numSplits, getNumberOfRecords(conf));
+ }
+
+ private List<InputSplit> createSplits(Configuration configuration, int
numSplits, long numRecords) throws IOException {
+ List<InputSplit> splits = new ArrayList(numSplits);
+ long nRecordsPerSplit = (long) Math.floor(numRecords * 1.0 /
numSplits);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Average records per map: " + nRecordsPerSplit +
+ ", Number of maps: " + numSplits + ", total records: " +
numRecords);
+ }
+
+ Path listingFilePath = getListingFilePath(configuration);
+ CopyListingFileStatus srcFileStatus = new CopyListingFileStatus();
+ Text srcRelPath = new Text();
+ long lastPosition = 0L;
+ long count = 0L;
+ long remains = numRecords - nRecordsPerSplit * (long) numSplits;
+
+ SequenceFile.Reader reader = null;
+ try {
+ reader = getListingFileReader(configuration);
+ while (reader.next(srcRelPath, srcFileStatus)) {
+ count++;
+
+ if((remains > 0 && count % (nRecordsPerSplit + 1) == 0) ||
+ (remains == 0 && count % nRecordsPerSplit == 0)){
+
+ long currentPosition = reader.getPosition();
+ FileSplit split = new FileSplit(listingFilePath,
lastPosition,
+ currentPosition - lastPosition, null);
+ if(LOG.isDebugEnabled()){
+ LOG.debug("Creating split: " + split + ", records in
split: " + count);
+ }
+
+ splits.add(split);
+ lastPosition = currentPosition;
+ if(remains > 0){
+ remains--;
+ }
+ count = 0L;
+ }
+ }
+
+ return splits;
+ } finally {
+ IOUtils.closeStream(reader);
+ }
+ }
+
+ public RecordReader<Text, CopyListingFileStatus> createRecordReader(
+ InputSplit split, TaskAttemptContext context)
+ throws IOException, InterruptedException {
+ return new SequenceFileRecordReader<Text, CopyListingFileStatus>();
+ }
+
+ private static Path getListingFilePath(Configuration configuration) {
+ String listingFilePathString =
+ configuration.get("distcp.listing.file.path", "");
+
+ assert !listingFilePathString.equals("")
+ : "Couldn't find listing file. Invalid input.";
+ return new Path(listingFilePathString);
+ }
+
+ private SequenceFile.Reader getListingFileReader(Configuration conf) {
+ Path listingFilePath = getListingFilePath(conf);
+
+ try {
+ FileSystem fs = listingFilePath.getFileSystem(conf);
+ if (!fs.exists(listingFilePath)) {
+ throw new IllegalArgumentException("Listing file doesn't exist
at: "
+ + listingFilePath);
+ }
+ return new SequenceFile.Reader(conf,
+ SequenceFile.Reader.file(listingFilePath));
+ } catch (IOException exception) {
+ LOG.error("Couldn't find listing file at: " + listingFilePath,
exception);
Review Comment:
use slf4j "couldn't read {}", listingFilePAth" format
using "read" over "find" as there could be other reasons for the failure
(permissions etc)
##########
hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/mapred/TestUniformRecordInputFormat.java:
##########
@@ -0,0 +1,191 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.tools.mapred;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.JobID;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.lib.input.FileSplit;
+import org.apache.hadoop.mapreduce.task.JobContextImpl;
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.tools.CopyListing;
+import org.apache.hadoop.tools.CopyListingFileStatus;
+import org.apache.hadoop.tools.DistCpContext;
+import org.apache.hadoop.tools.DistCpOptions;
+import org.apache.hadoop.tools.StubContext;
+import org.apache.hadoop.tools.util.DistCpUtils;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
+
+
+public class TestUniformRecordInputFormat {
+ private static MiniDFSCluster cluster;
+ private static final int N_FILES = 20;
+ private static final int SIZEOF_EACH_FILE = 1024;
+ private static final Random random = new Random();
+ private static int totalFileSize = 0;
+
+ private static final Credentials CREDENTIALS = new Credentials();
+
+
+ @BeforeClass
+ public static void setup() throws Exception {
+ cluster = new MiniDFSCluster.Builder(new Configuration()).numDataNodes(1)
+ .format(true).build();
+ totalFileSize = 0;
+
+ for (int i=0; i<N_FILES; ++i)
+ totalFileSize += createFile("/tmp/source/" + String.valueOf(i),
SIZEOF_EACH_FILE);
+ }
+
+ private static DistCpOptions getOptions(int nMaps) throws Exception {
+ Path sourcePath = new Path(cluster.getFileSystem().getUri().toString()
+ + "/tmp/source");
+ Path targetPath = new Path(cluster.getFileSystem().getUri().toString()
+ + "/tmp/target");
+
+ List<Path> sourceList = new ArrayList<Path>();
+ sourceList.add(sourcePath);
+ return new DistCpOptions.Builder(sourceList, targetPath)
+ .maxMaps(nMaps)
+ .build();
+ }
+
+ private static int createFile(String path, int fileSize) throws Exception {
+ FileSystem fileSystem = null;
+ DataOutputStream outputStream = null;
+ try {
+ fileSystem = cluster.getFileSystem();
+ outputStream = fileSystem.create(new Path(path), true, 0);
+ int size = (int) Math.ceil(fileSize + (1 - random.nextFloat()) *
fileSize);
+ outputStream.write(new byte[size]);
+ return size;
+ }
+ finally {
+ IOUtils.cleanupWithLogger(null, fileSystem, outputStream);
+ }
+ }
+
+ @AfterClass
+ public static void tearDown() {
+ cluster.shutdown();
+ }
+
+ public void testGetSplits(int nMaps) throws Exception {
+ DistCpContext context = new DistCpContext(getOptions(nMaps));
+ Configuration configuration = new Configuration();
+ configuration.set("mapred.map.tasks",
String.valueOf(context.getMaxMaps()));
+ Path listFile = new Path(cluster.getFileSystem().getUri().toString()
+ + "/tmp/testGetSplits_2/fileList.seq");
+ CopyListing.getCopyListing(configuration, CREDENTIALS, context)
+ .buildListing(listFile, context);
+
+ JobContext jobContext = new JobContextImpl(configuration, new JobID());
+ UniformRecordInputFormat uniformRecordInputFormat = new
UniformRecordInputFormat();
+ List<InputSplit> splits
+ = uniformRecordInputFormat.getSplits(jobContext);
+
+ long totalRecords = DistCpUtils.getLong(configuration,
"mapred.number.of.records");
+ long recordPerMap = totalRecords / nMaps;
+
+
+ checkSplits(listFile, splits);
+
+ int doubleCheckedTotalSize = 0;
+ for (int i=0; i < splits.size(); ++i) {
+ InputSplit split = splits.get(i);
+ int currentSplitSize = 0;
+ RecordReader<Text, CopyListingFileStatus> recordReader =
+ uniformRecordInputFormat.createRecordReader(split, null);
+ StubContext stubContext = new StubContext(jobContext.getConfiguration(),
+ recordReader, 0);
+ final TaskAttemptContext taskAttemptContext
+ = stubContext.getContext();
+ recordReader.initialize(split, taskAttemptContext);
+ int recordCnt = 0;
+ while (recordReader.nextKeyValue()) {
+ recordCnt++;
+ Path sourcePath = recordReader.getCurrentValue().getPath();
+ FileSystem fs = sourcePath.getFileSystem(configuration);
+ FileStatus fileStatus [] = fs.listStatus(sourcePath);
+ if (fileStatus.length > 1) {
+ continue;
+ }
+ currentSplitSize += fileStatus[0].getLen();
+ }
+
+ Assert.assertTrue(recordCnt == recordPerMap || recordCnt ==
(recordPerMap + 1));
Review Comment:
use AssertJ.assertThat(). eg.
```java
assertJ.assertThat(recordCnt)
.decribedAs("record count")
.isGreaterThanOrEqualTo(recordPerMap)
.isLessThanOrEqualTo(recordPerMap +1)
```
assertj gives wonderful error messages on assertion failures.
##########
hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/mapred/TestUniformRecordInputFormat.java:
##########
@@ -0,0 +1,191 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.tools.mapred;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.JobID;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.lib.input.FileSplit;
+import org.apache.hadoop.mapreduce.task.JobContextImpl;
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.tools.CopyListing;
+import org.apache.hadoop.tools.CopyListingFileStatus;
+import org.apache.hadoop.tools.DistCpContext;
+import org.apache.hadoop.tools.DistCpOptions;
+import org.apache.hadoop.tools.StubContext;
+import org.apache.hadoop.tools.util.DistCpUtils;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
+
+
+public class TestUniformRecordInputFormat {
+ private static MiniDFSCluster cluster;
+ private static final int N_FILES = 20;
+ private static final int SIZEOF_EACH_FILE = 1024;
+ private static final Random random = new Random();
+ private static int totalFileSize = 0;
+
+ private static final Credentials CREDENTIALS = new Credentials();
+
+
+ @BeforeClass
+ public static void setup() throws Exception {
+ cluster = new MiniDFSCluster.Builder(new Configuration()).numDataNodes(1)
+ .format(true).build();
+ totalFileSize = 0;
+
+ for (int i=0; i<N_FILES; ++i)
+ totalFileSize += createFile("/tmp/source/" + String.valueOf(i),
SIZEOF_EACH_FILE);
+ }
+
+ private static DistCpOptions getOptions(int nMaps) throws Exception {
+ Path sourcePath = new Path(cluster.getFileSystem().getUri().toString()
+ + "/tmp/source");
+ Path targetPath = new Path(cluster.getFileSystem().getUri().toString()
+ + "/tmp/target");
+
+ List<Path> sourceList = new ArrayList<Path>();
+ sourceList.add(sourcePath);
+ return new DistCpOptions.Builder(sourceList, targetPath)
+ .maxMaps(nMaps)
+ .build();
+ }
+
+ private static int createFile(String path, int fileSize) throws Exception {
+ FileSystem fileSystem = null;
+ DataOutputStream outputStream = null;
+ try {
+ fileSystem = cluster.getFileSystem();
+ outputStream = fileSystem.create(new Path(path), true, 0);
+ int size = (int) Math.ceil(fileSize + (1 - random.nextFloat()) *
fileSize);
+ outputStream.write(new byte[size]);
+ return size;
+ }
+ finally {
+ IOUtils.cleanupWithLogger(null, fileSystem, outputStream);
+ }
+ }
+
+ @AfterClass
+ public static void tearDown() {
+ cluster.shutdown();
+ }
+
+ public void testGetSplits(int nMaps) throws Exception {
+ DistCpContext context = new DistCpContext(getOptions(nMaps));
+ Configuration configuration = new Configuration();
+ configuration.set("mapred.map.tasks",
String.valueOf(context.getMaxMaps()));
+ Path listFile = new Path(cluster.getFileSystem().getUri().toString()
+ + "/tmp/testGetSplits_2/fileList.seq");
+ CopyListing.getCopyListing(configuration, CREDENTIALS, context)
+ .buildListing(listFile, context);
+
+ JobContext jobContext = new JobContextImpl(configuration, new JobID());
+ UniformRecordInputFormat uniformRecordInputFormat = new
UniformRecordInputFormat();
+ List<InputSplit> splits
+ = uniformRecordInputFormat.getSplits(jobContext);
+
+ long totalRecords = DistCpUtils.getLong(configuration,
"mapred.number.of.records");
+ long recordPerMap = totalRecords / nMaps;
+
+
+ checkSplits(listFile, splits);
+
+ int doubleCheckedTotalSize = 0;
+ for (int i=0; i < splits.size(); ++i) {
+ InputSplit split = splits.get(i);
+ int currentSplitSize = 0;
+ RecordReader<Text, CopyListingFileStatus> recordReader =
+ uniformRecordInputFormat.createRecordReader(split, null);
+ StubContext stubContext = new StubContext(jobContext.getConfiguration(),
+ recordReader, 0);
+ final TaskAttemptContext taskAttemptContext
+ = stubContext.getContext();
+ recordReader.initialize(split, taskAttemptContext);
+ int recordCnt = 0;
+ while (recordReader.nextKeyValue()) {
+ recordCnt++;
+ Path sourcePath = recordReader.getCurrentValue().getPath();
+ FileSystem fs = sourcePath.getFileSystem(configuration);
+ FileStatus fileStatus [] = fs.listStatus(sourcePath);
+ if (fileStatus.length > 1) {
+ continue;
+ }
+ currentSplitSize += fileStatus[0].getLen();
+ }
+
+ Assert.assertTrue(recordCnt == recordPerMap || recordCnt ==
(recordPerMap + 1));
+ doubleCheckedTotalSize += currentSplitSize;
+ }
+
+ Assert.assertEquals(totalFileSize, doubleCheckedTotalSize);
Review Comment:
again, assertJ here and everywhere below
> hadoop distcp needs support to filter by file/directory attribute
> -----------------------------------------------------------------
>
> Key: HADOOP-18891
> URL: https://issues.apache.org/jira/browse/HADOOP-18891
> Project: Hadoop Common
> Issue Type: New Feature
> Components: tools/distcp
> Affects Versions: 3.3.6
> Environment: hadoop3.3.6
> Reporter: Authur Wang
> Priority: Major
> Labels: pull-request-available
>
> In some circumstances, we need to filter file/directory by file/directroy.
> For example, we need to filter out them by file modified time, isDir attrs,
> etc.
> So, *should we introduce a new method public boolean
> shouldCopy(CopyListingFileStatus fileStatus)* ?
> by this approach, we can introduce a more fluent way to do things than
> public abstract boolean shouldCopy(Path path).
--
This message was sent by Atlassian Jira
(v8.20.10#820010)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]