This is an automated email from the ASF dual-hosted git repository.
jlowe pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/hadoop.git
The following commit(s) were added to refs/heads/trunk by this push:
new c613296 MAPREDUCE-7241. FileInputFormat listStatus with less memory
footprint. Contributed by Zhihua Deng
c613296 is described below
commit c613296dc85ac7b22c171c84f578106b315cc012
Author: Jason Lowe <[email protected]>
AuthorDate: Wed Apr 1 07:45:42 2020 -0500
MAPREDUCE-7241. FileInputFormat listStatus with less memory footprint.
Contributed by Zhihua Deng
---
.../org/apache/hadoop/mapred/FileInputFormat.java | 6 +-
.../hadoop/mapred/LocatedFileStatusFetcher.java | 3 +-
.../mapreduce/lib/input/FileInputFormat.java | 37 ++++++++--
.../mapreduce/lib/input/TestFileInputFormat.java | 79 ++++++++++++++++++++--
4 files changed, 114 insertions(+), 11 deletions(-)
diff --git
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/FileInputFormat.java
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/FileInputFormat.java
index b3e2b4a..b738037 100644
---
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/FileInputFormat.java
+++
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/FileInputFormat.java
@@ -193,7 +193,8 @@ public abstract class FileInputFormat<K, V> implements
InputFormat<K, V> {
if (stat.isDirectory()) {
addInputPathRecursively(result, fs, stat.getPath(), inputFilter);
} else {
- result.add(stat);
+ result.add(org.apache.hadoop.mapreduce.lib.input.
+ FileInputFormat.shrinkStatus(stat));
}
}
}
@@ -290,7 +291,8 @@ public abstract class FileInputFormat<K, V> implements
InputFormat<K, V> {
addInputPathRecursively(result, fs, stat.getPath(),
inputFilter);
} else {
- result.add(stat);
+ result.add(org.apache.hadoop.mapreduce.lib.input.
+ FileInputFormat.shrinkStatus(stat));
}
}
}
diff --git
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/LocatedFileStatusFetcher.java
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/LocatedFileStatusFetcher.java
index a248f14..4cb36a5 100644
---
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/LocatedFileStatusFetcher.java
+++
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/LocatedFileStatusFetcher.java
@@ -259,7 +259,8 @@ public class LocatedFileStatusFetcher {
if (recursive && stat.isDirectory()) {
result.dirsNeedingRecursiveCalls.add(stat);
} else {
- result.locatedFileStatuses.add(stat);
+ result.locatedFileStatuses.add(org.apache.hadoop.mapreduce.lib.
+ input.FileInputFormat.shrinkStatus(stat));
}
}
}
diff --git
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/FileInputFormat.java
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/FileInputFormat.java
index 22efe14..1b3365c 100644
---
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/FileInputFormat.java
+++
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/FileInputFormat.java
@@ -325,7 +325,7 @@ public abstract class FileInputFormat<K, V> extends
InputFormat<K, V> {
addInputPathRecursively(result, fs, stat.getPath(),
inputFilter);
} else {
- result.add(stat);
+ result.add(shrinkStatus(stat));
}
}
}
@@ -364,13 +364,42 @@ public abstract class FileInputFormat<K, V> extends
InputFormat<K, V> {
if (stat.isDirectory()) {
addInputPathRecursively(result, fs, stat.getPath(), inputFilter);
} else {
- result.add(stat);
+ result.add(shrinkStatus(stat));
}
}
}
}
-
-
+
+ /**
+ * The HdfsBlockLocation includes a LocatedBlock which contains messages
+ * for issuing more detailed queries to datanodes about a block, but these
+ * messages are useless during job submission currently. This method tries
+ * to exclude the LocatedBlock from HdfsBlockLocation by creating a new
+ * BlockLocation from original, reshaping the LocatedFileStatus,
+ * allowing {@link #listStatus(JobContext)} to scan more files with less
+ * memory footprint.
+ * @see BlockLocation
+ * @see org.apache.hadoop.fs.HdfsBlockLocation
+ * @param origStat The fat FileStatus.
+ * @return The FileStatus that has been shrunk.
+ */
+ public static FileStatus shrinkStatus(FileStatus origStat) {
+ if (origStat.isDirectory() || origStat.getLen() == 0 ||
+ !(origStat instanceof LocatedFileStatus)) {
+ return origStat;
+ } else {
+ BlockLocation[] blockLocations =
+ ((LocatedFileStatus)origStat).getBlockLocations();
+ BlockLocation[] locs = new BlockLocation[blockLocations.length];
+ int i = 0;
+ for (BlockLocation location : blockLocations) {
+ locs[i++] = new BlockLocation(location);
+ }
+ LocatedFileStatus newStat = new LocatedFileStatus(origStat, locs);
+ return newStat;
+ }
+ }
+
/**
* A factory that makes the split for this class. It can be overridden
* by sub-classes to make sub-types
diff --git
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/input/TestFileInputFormat.java
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/input/TestFileInputFormat.java
index 3897a9b..ca30bf3 100644
---
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/input/TestFileInputFormat.java
+++
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/input/TestFileInputFormat.java
@@ -32,11 +32,17 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.BlockLocation;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.HdfsBlockLocation;
import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;
import org.apache.hadoop.fs.RawLocalFileSystem;
import org.apache.hadoop.fs.RemoteIterator;
+import org.apache.hadoop.hdfs.protocol.DatanodeID;
+import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
+import org.apache.hadoop.hdfs.protocol.LocatedBlock;
+import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
import org.apache.hadoop.mapred.SplitLocationInfo;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.Job;
@@ -238,6 +244,50 @@ public class TestFileInputFormat {
}
}
+ @Test
+ public void testShrinkStatus() throws IOException {
+ Configuration conf = getConfiguration();
+ MockFileSystem mockFs =
+ (MockFileSystem) new Path("test:///").getFileSystem(conf);
+ Path dir1 = new Path("test:/a1");
+ RemoteIterator<LocatedFileStatus> statuses =
mockFs.listLocatedStatus(dir1);
+ boolean verified = false;
+ while (statuses.hasNext()) {
+ LocatedFileStatus orig = statuses.next();
+ LocatedFileStatus shrink =
+ (LocatedFileStatus)FileInputFormat.shrinkStatus(orig);
+ Assert.assertTrue(orig.equals(shrink));
+ if (shrink.getBlockLocations() != null) {
+ Assert.assertEquals(orig.getBlockLocations().length,
+ shrink.getBlockLocations().length);
+ for (int i = 0; i < shrink.getBlockLocations().length; i++) {
+ verified = true;
+ BlockLocation location = shrink.getBlockLocations()[i];
+ BlockLocation actual = orig.getBlockLocations()[i];
+ Assert.assertNotNull(((HdfsBlockLocation)actual).getLocatedBlock());
+ Assert.assertEquals(BlockLocation.class.getName(),
+ location.getClass().getName());
+ Assert.assertArrayEquals(actual.getHosts(), location.getHosts());
+ Assert.assertArrayEquals(actual.getCachedHosts(),
+ location.getCachedHosts());
+ Assert.assertArrayEquals(actual.getStorageIds(),
+ location.getStorageIds());
+ Assert.assertArrayEquals(actual.getStorageTypes(),
+ location.getStorageTypes());
+ Assert.assertArrayEquals(actual.getTopologyPaths(),
+ location.getTopologyPaths());
+ Assert.assertArrayEquals(actual.getNames(), location.getNames());
+ Assert.assertEquals(actual.getLength(), location.getLength());
+ Assert.assertEquals(actual.getOffset(), location.getOffset());
+ Assert.assertEquals(actual.isCorrupt(), location.isCorrupt());
+ }
+ } else {
+ Assert.assertTrue(orig.getBlockLocations() == null);
+ }
+ }
+ Assert.assertTrue(verified);
+ }
+
public static List<Path> configureTestSimple(Configuration conf, FileSystem
localFs)
throws IOException {
Path base1 = new Path(TEST_ROOT_DIR, "input1");
@@ -437,10 +487,31 @@ public class TestFileInputFormat {
@Override
public BlockLocation[] getFileBlockLocations(FileStatus file, long start,
long len)
throws IOException {
- return new BlockLocation[] {
- new BlockLocation(new String[] { "localhost:9866", "otherhost:9866"
},
- new String[] { "localhost", "otherhost" }, new String[] {
"localhost" },
- new String[0], 0, len, false) }; }
+ DatanodeInfo[] ds = new DatanodeInfo[2];
+ ds[0] = new DatanodeDescriptor(
+ new DatanodeID("127.0.0.1", "localhost", "abcd",
+ 9866, 9867, 9868, 9869));
+ ds[1] = new DatanodeDescriptor(
+ new DatanodeID("1.0.0.1", "otherhost", "efgh",
+ 9866, 9867, 9868, 9869));
+ long blockLen = len / 3;
+ ExtendedBlock b1 = new ExtendedBlock("bpid", 0, blockLen, 0);
+ ExtendedBlock b2 = new ExtendedBlock("bpid", 1, blockLen, 1);
+ ExtendedBlock b3 = new ExtendedBlock("bpid", 2, len - 2 * blockLen, 2);
+ String[] names = new String[]{ "localhost:9866", "otherhost:9866" };
+ String[] hosts = new String[]{ "localhost", "otherhost" };
+ String[] cachedHosts = {"localhost"};
+ BlockLocation loc1 = new BlockLocation(names, hosts, cachedHosts,
+ new String[0], 0, blockLen, false);
+ BlockLocation loc2 = new BlockLocation(names, hosts, cachedHosts,
+ new String[0], blockLen, blockLen, false);
+ BlockLocation loc3 = new BlockLocation(names, hosts, cachedHosts,
+ new String[0], 2 * blockLen, len - 2 * blockLen, false);
+ return new BlockLocation[]{
+ new HdfsBlockLocation(loc1, new LocatedBlock(b1, ds)),
+ new HdfsBlockLocation(loc2, new LocatedBlock(b2, ds)),
+ new HdfsBlockLocation(loc3, new LocatedBlock(b3, ds)) };
+ }
@Override
protected RemoteIterator<LocatedFileStatus> listLocatedStatus(Path f,
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]