Propchange: hadoop/common/branches/branch-0.20-security-204/src/mapred/ ------------------------------------------------------------------------------ --- svn:mergeinfo (original) +++ svn:mergeinfo Tue May 10 20:52:35 2011 @@ -1,6 +1,6 @@ -/hadoop/common/branches/branch-0.20/src/mapred:829987,831184,909245,909723,960946 -/hadoop/common/branches/branch-0.20-security/src/mapred:1098837 -/hadoop/common/branches/branch-0.20-security-203/src/mapred:1096071 +/hadoop/common/branches/branch-0.20/src/mapred:826138,826568,829987,831184,833001,880632,898713,909245,909723,960946 +/hadoop/common/branches/branch-0.20-security/src/mapred:1097202,1098837 +/hadoop/common/branches/branch-0.20-security-203/src/mapred:1096071,1097012-1099333 /hadoop/core/branches/branch-0.19/src/mapred:713112 /hadoop/core/trunk/src/mapred:727001,727117,727191,727212,727217,727228,727255,727869,728187,729052,729987,732385,732572,732613,732777,732838,732869,733887,734870,734916,736426,738328,738697,740077,740157,741703,741762,743745,743816,743892,744894,745180,746010,746206,746227,746233,746274,746338,746902-746903,746925,746944,746968,746970,747279,747289,747802,748084,748090,748783,749262,749318,749863,750533,752073,752609,752834,752836,752913,752932,753112-753113,753346,754645,754847,754927,755035,755226,755348,755370,755418,755426,755790,755905,755938,755960,755986,755998,756352,757448,757624,757849,758156,758180,759398,759932,760502,760783,761046,761482,761632,762216,762879,763107,763502,764967,765016,765809,765951,771607,771661,772844,772876,772884,772920,773889,776638,778962,778966,779893,781720,784661,785046,785569 /hadoop/mapreduce/trunk/src/java:808650
Modified: hadoop/common/branches/branch-0.20-security-204/src/mapred/org/apache/hadoop/mapred/lib/CombineFileInputFormat.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-204/src/mapred/org/apache/hadoop/mapred/lib/CombineFileInputFormat.java?rev=1101640&r1=1101639&r2=1101640&view=diff ============================================================================== --- hadoop/common/branches/branch-0.20-security-204/src/mapred/org/apache/hadoop/mapred/lib/CombineFileInputFormat.java (original) +++ hadoop/common/branches/branch-0.20-security-204/src/mapred/org/apache/hadoop/mapred/lib/CombineFileInputFormat.java Tue May 10 20:52:35 2011 @@ -20,12 +20,12 @@ package org.apache.hadoop.mapred.lib; import java.io.IOException; import java.util.ArrayList; +import java.util.HashSet; import java.util.List; import java.util.HashMap; import java.util.Set; import java.util.Iterator; import java.util.Map; -import java.util.Map.Entry; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileUtil; @@ -73,6 +73,9 @@ public abstract class CombineFileInputFo // across multiple pools. private ArrayList<MultiPathFilter> pools = new ArrayList<MultiPathFilter>(); + // mapping from a rack name to the set of Nodes in the rack + private static HashMap<String, Set<String>> rackToNodes = + new HashMap<String, Set<String>>(); /** * Specify the maximum size (in bytes) of each split. Each split is * approximately equal to the specified size. @@ -214,6 +217,8 @@ public abstract class CombineFileInputFo getMoreSplits(job, myPaths.toArray(new Path[myPaths.size()]), maxSize, minSizeNode, minSizeRack, splits); + // free up rackToNodes map + rackToNodes.clear(); return splits.toArray(new CombineFileSplit[splits.size()]); } @@ -341,7 +346,7 @@ public abstract class CombineFileInputFo // create this split. if (maxSize != 0 && curSplitSize >= maxSize) { // create an input split and add it to the splits array - addCreatedSplit(job, splits, racks, validBlocks); + addCreatedSplit(job, splits, getHosts(racks), validBlocks); createdSplit = true; break; } @@ -360,7 +365,7 @@ public abstract class CombineFileInputFo if (minSizeRack != 0 && curSplitSize >= minSizeRack) { // if there is a mimimum size specified, then create a single split // otherwise, store these blocks into overflow data structure - addCreatedSplit(job, splits, racks, validBlocks); + addCreatedSplit(job, splits, getHosts(racks), validBlocks); } else { // There were a few blocks in this rack that remained to be processed. // Keep them in 'overflow' block list. These will be combined later. @@ -393,7 +398,7 @@ public abstract class CombineFileInputFo // create this split. if (maxSize != 0 && curSplitSize >= maxSize) { // create an input split and add it to the splits array - addCreatedSplit(job, splits, racks, validBlocks); + addCreatedSplit(job, splits, getHosts(racks), validBlocks); curSplitSize = 0; validBlocks.clear(); racks.clear(); @@ -402,7 +407,7 @@ public abstract class CombineFileInputFo // Process any remaining blocks, if any. if (!validBlocks.isEmpty()) { - addCreatedSplit(job, splits, racks, validBlocks); + addCreatedSplit(job, splits, getHosts(racks), validBlocks); } } @@ -412,13 +417,12 @@ public abstract class CombineFileInputFo */ private void addCreatedSplit(JobConf job, List<CombineFileSplit> splitList, - List<String> racks, + List<String> locations, ArrayList<OneBlockInfo> validBlocks) { // create an input split Path[] fl = new Path[validBlocks.size()]; long[] offset = new long[validBlocks.size()]; long[] length = new long[validBlocks.size()]; - String[] rackLocations = racks.toArray(new String[racks.size()]); for (int i = 0; i < validBlocks.size(); i++) { fl[i] = validBlocks.get(i).onepath; offset[i] = validBlocks.get(i).offset; @@ -427,7 +431,7 @@ public abstract class CombineFileInputFo // add this split to the list that is returned CombineFileSplit thissplit = new CombineFileSplit(job, fl, offset, - length, rackLocations); + length, locations.toArray(new String[0])); splitList.add(thissplit); } @@ -484,7 +488,9 @@ public abstract class CombineFileInputFo rackToBlocks.put(rack, blklist); } blklist.add(oneblock); - } + // Add this host to rackToNodes map + addHostToRack(oneblock.racks[j], oneblock.hosts[j]); + } // add this block to the node --> block map for (int j = 0; j < oneblock.hosts.length; j++) { @@ -547,6 +553,23 @@ public abstract class CombineFileInputFo } } + private static void addHostToRack(String rack, String host) { + Set<String> hosts = rackToNodes.get(rack); + if (hosts == null) { + hosts = new HashSet<String>(); + rackToNodes.put(rack, hosts); + } + hosts.add(host); + } + + private static List<String> getHosts(List<String> racks) { + List<String> hosts = new ArrayList<String>(); + for (String rack : racks) { + hosts.addAll(rackToNodes.get(rack)); + } + return hosts; + } + /** * Accept a path only if any one of filters given in the * constructor do. Modified: hadoop/common/branches/branch-0.20-security-204/src/test/org/apache/hadoop/io/compress/TestCodec.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-204/src/test/org/apache/hadoop/io/compress/TestCodec.java?rev=1101640&r1=1101639&r2=1101640&view=diff ============================================================================== --- hadoop/common/branches/branch-0.20-security-204/src/test/org/apache/hadoop/io/compress/TestCodec.java (original) +++ hadoop/common/branches/branch-0.20-security-204/src/test/org/apache/hadoop/io/compress/TestCodec.java Tue May 10 20:52:35 2011 @@ -19,13 +19,22 @@ package org.apache.hadoop.io.compress; import java.io.BufferedInputStream; import java.io.BufferedOutputStream; +import java.io.BufferedReader; +import java.io.BufferedWriter; import java.io.ByteArrayOutputStream; import java.io.DataInputStream; import java.io.DataOutputStream; +import java.io.File; +import java.io.FileInputStream; +import java.io.FileOutputStream; import java.io.IOException; import java.io.InputStream; +import java.io.InputStreamReader; +import java.io.OutputStream; +import java.io.OutputStreamWriter; import java.util.Arrays; import java.util.Random; +import java.util.zip.GZIPInputStream; import java.util.zip.GZIPOutputStream; import junit.framework.TestCase; @@ -45,8 +54,11 @@ import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Writable; import org.apache.hadoop.util.ReflectionUtils; import org.apache.hadoop.io.SequenceFile.CompressionType; +import org.apache.hadoop.io.compress.CompressorStream; import org.apache.hadoop.io.compress.CompressionOutputStream; import org.apache.hadoop.io.compress.zlib.BuiltInGzipDecompressor; +import org.apache.hadoop.io.compress.zlib.BuiltInZlibDeflater; +import org.apache.hadoop.io.compress.zlib.BuiltInZlibInflater; import org.apache.hadoop.io.compress.zlib.ZlibCompressor.CompressionLevel; import org.apache.hadoop.io.compress.zlib.ZlibCompressor.CompressionStrategy; import org.apache.hadoop.io.compress.zlib.ZlibFactory; @@ -447,4 +459,156 @@ public class TestCodec extends TestCase super(name); } + public void testCodecPoolAndGzipDecompressor() { + // BuiltInZlibInflater should not be used as the GzipCodec decompressor. + // Assert that this is the case. + + // Don't use native libs for this test. + Configuration conf = new Configuration(); + conf.setBoolean("hadoop.native.lib", false); + assertFalse("ZlibFactory is using native libs against request", + ZlibFactory.isNativeZlibLoaded(conf)); + + // This should give us a BuiltInZlibInflater. + Decompressor zlibDecompressor = ZlibFactory.getZlibDecompressor(conf); + assertNotNull("zlibDecompressor is null!", zlibDecompressor); + assertTrue("ZlibFactory returned unexpected inflator", + zlibDecompressor instanceof BuiltInZlibInflater); + + // its createOutputStream() just wraps the existing stream in a + // java.util.zip.GZIPOutputStream. + CompressionCodecFactory ccf = new CompressionCodecFactory(conf); + CompressionCodec codec = ccf.getCodec(new Path("foo.gz")); + assertTrue("Codec for .gz file is not GzipCodec", + codec instanceof GzipCodec); + + // make sure we don't get a null decompressor + Decompressor codecDecompressor = codec.createDecompressor(); + if (null == codecDecompressor) { + fail("Got null codecDecompressor"); + } + + // Asking the CodecPool for a decompressor for GzipCodec + // should not return null + Decompressor poolDecompressor = CodecPool.getDecompressor(codec); + if (null == poolDecompressor) { + fail("Got null poolDecompressor"); + } + // return a couple decompressors + CodecPool.returnDecompressor(zlibDecompressor); + CodecPool.returnDecompressor(poolDecompressor); + Decompressor poolDecompressor2 = CodecPool.getDecompressor(codec); + if (poolDecompressor.getClass() == BuiltInGzipDecompressor.class) { + if (poolDecompressor == poolDecompressor2) { + fail("Reused java gzip decompressor in pool"); + } + } else { + if (poolDecompressor != poolDecompressor2) { + fail("Did not reuse native gzip decompressor in pool"); + } + } + } + + public void testGzipCodecRead() throws IOException { + // Create a gzipped file and try to read it back, using a decompressor + // from the CodecPool. + + // Don't use native libs for this test. + Configuration conf = new Configuration(); + conf.setBoolean("hadoop.native.lib", false); + assertFalse("ZlibFactory is using native libs against request", + ZlibFactory.isNativeZlibLoaded(conf)); + + // Ensure that the CodecPool has a BuiltInZlibInflater in it. + Decompressor zlibDecompressor = ZlibFactory.getZlibDecompressor(conf); + assertNotNull("zlibDecompressor is null!", zlibDecompressor); + assertTrue("ZlibFactory returned unexpected inflator", + zlibDecompressor instanceof BuiltInZlibInflater); + CodecPool.returnDecompressor(zlibDecompressor); + + // Now create a GZip text file. + String tmpDir = System.getProperty("test.build.data", "/tmp/"); + Path f = new Path(new Path(tmpDir), "testGzipCodecRead.txt.gz"); + BufferedWriter bw = new BufferedWriter(new OutputStreamWriter( + new GZIPOutputStream(new FileOutputStream(f.toString())))); + final String msg = "This is the message in the file!"; + bw.write(msg); + bw.close(); + + // Now read it back, using the CodecPool to establish the + // decompressor to use. + CompressionCodecFactory ccf = new CompressionCodecFactory(conf); + CompressionCodec codec = ccf.getCodec(f); + Decompressor decompressor = CodecPool.getDecompressor(codec); + FileSystem fs = FileSystem.getLocal(conf); + InputStream is = fs.open(f); + is = codec.createInputStream(is, decompressor); + BufferedReader br = new BufferedReader(new InputStreamReader(is)); + String line = br.readLine(); + assertEquals("Didn't get the same message back!", msg, line); + br.close(); + } + + private void verifyGzipFile(String filename, String msg) throws IOException { + BufferedReader r = new BufferedReader(new InputStreamReader( + new GZIPInputStream(new FileInputStream(filename)))); + try { + String line = r.readLine(); + assertEquals("Got invalid line back from " + filename, msg, line); + } finally { + r.close(); + new File(filename).delete(); + } + } + + public void testGzipCodecWrite() throws IOException { + // Create a gzipped file using a compressor from the CodecPool, + // and try to read it back via the regular GZIPInputStream. + + // Don't use native libs for this test. + Configuration conf = new Configuration(); + conf.setBoolean("hadoop.native.lib", false); + assertFalse("ZlibFactory is using native libs against request", + ZlibFactory.isNativeZlibLoaded(conf)); + + // Ensure that the CodecPool has a BuiltInZlibDeflater in it. + Compressor zlibCompressor = ZlibFactory.getZlibCompressor(conf); + assertNotNull("zlibCompressor is null!", zlibCompressor); + assertTrue("ZlibFactory returned unexpected deflator", + zlibCompressor instanceof BuiltInZlibDeflater); + CodecPool.returnCompressor(zlibCompressor); + + // Create a GZIP text file via the Compressor interface. + CompressionCodecFactory ccf = new CompressionCodecFactory(conf); + CompressionCodec codec = ccf.getCodec(new Path("foo.gz")); + assertTrue("Codec for .gz file is not GzipCodec", codec instanceof GzipCodec); + + final String msg = "This is the message we are going to compress."; + final String tmpDir = System.getProperty("test.build.data", "/tmp/"); + final String fileName = new Path(new Path(tmpDir), + "testGzipCodecWrite.txt.gz").toString(); + + BufferedWriter w = null; + Compressor gzipCompressor = CodecPool.getCompressor(codec); + if (null != gzipCompressor) { + // If it gives us back a Compressor, we should be able to use this + // to write files we can then read back with Java's gzip tools. + OutputStream os = new CompressorStream(new FileOutputStream(fileName), + gzipCompressor); + w = new BufferedWriter(new OutputStreamWriter(os)); + w.write(msg); + w.close(); + CodecPool.returnCompressor(gzipCompressor); + + verifyGzipFile(fileName, msg); + } + + // Create a gzip text file via codec.getOutputStream(). + w = new BufferedWriter(new OutputStreamWriter( + codec.createOutputStream(new FileOutputStream(fileName)))); + w.write(msg); + w.close(); + + verifyGzipFile(fileName, msg); + } } Modified: hadoop/common/branches/branch-0.20-security-204/src/test/org/apache/hadoop/mapred/lib/TestCombineFileInputFormat.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-204/src/test/org/apache/hadoop/mapred/lib/TestCombineFileInputFormat.java?rev=1101640&r1=1101639&r2=1101640&view=diff ============================================================================== --- hadoop/common/branches/branch-0.20-security-204/src/test/org/apache/hadoop/mapred/lib/TestCombineFileInputFormat.java (original) +++ hadoop/common/branches/branch-0.20-security-204/src/test/org/apache/hadoop/mapred/lib/TestCombineFileInputFormat.java Tue May 10 20:52:35 2011 @@ -18,11 +18,6 @@ package org.apache.hadoop.mapred.lib; import java.io.IOException; -import java.io.DataOutputStream; -import java.util.BitSet; -import java.util.HashMap; -import java.util.HashSet; -import java.util.Random; import junit.framework.TestCase; @@ -30,17 +25,12 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; import org.apache.hadoop.hdfs.MiniDFSCluster; -import org.apache.hadoop.fs.BlockLocation; -import org.apache.hadoop.io.BytesWritable; import org.apache.hadoop.hdfs.DFSTestUtil; import org.apache.hadoop.hdfs.DistributedFileSystem; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.io.SequenceFile; -import org.apache.hadoop.io.SequenceFile.CompressionType; import org.apache.hadoop.fs.PathFilter; import org.apache.hadoop.mapred.InputSplit; import org.apache.hadoop.mapred.JobConf; @@ -151,14 +141,14 @@ public class TestCombineFileInputFormat assertEquals(fileSplit.getPath(1).getName(), file2.getName()); assertEquals(fileSplit.getOffset(1), BLOCKSIZE); assertEquals(fileSplit.getLength(1), BLOCKSIZE); - assertEquals(fileSplit.getLocations()[0], "/r2"); + assertEquals(fileSplit.getLocations()[0], hosts2[0]); // should be on r2 fileSplit = (CombineFileSplit) splits[1]; assertEquals(fileSplit.getNumPaths(), 1); assertEquals(fileSplit.getLocations().length, 1); assertEquals(fileSplit.getPath(0).getName(), file1.getName()); assertEquals(fileSplit.getOffset(0), 0); assertEquals(fileSplit.getLength(0), BLOCKSIZE); - assertEquals(fileSplit.getLocations()[0], "/r1"); + assertEquals(fileSplit.getLocations()[0], hosts1[0]); // should be on r1 // create another file on 3 datanodes and 3 racks. dfs.startDataNodes(conf, 1, true, null, rack3, hosts3, null); @@ -186,7 +176,7 @@ public class TestCombineFileInputFormat assertEquals(fileSplit.getPath(2).getName(), file3.getName()); assertEquals(fileSplit.getOffset(2), 2 * BLOCKSIZE); assertEquals(fileSplit.getLength(2), BLOCKSIZE); - assertEquals(fileSplit.getLocations()[0], "/r3"); + assertEquals(fileSplit.getLocations()[0], hosts3[0]); // should be on r3 fileSplit = (CombineFileSplit) splits[1]; assertEquals(fileSplit.getNumPaths(), 2); assertEquals(fileSplit.getLocations().length, 1); @@ -196,14 +186,14 @@ public class TestCombineFileInputFormat assertEquals(fileSplit.getPath(1).getName(), file2.getName()); assertEquals(fileSplit.getOffset(1), BLOCKSIZE); assertEquals(fileSplit.getLength(1), BLOCKSIZE); - assertEquals(fileSplit.getLocations()[0], "/r2"); + assertEquals(fileSplit.getLocations()[0], hosts2[0]); // should be on r2 fileSplit = (CombineFileSplit) splits[2]; assertEquals(fileSplit.getNumPaths(), 1); assertEquals(fileSplit.getLocations().length, 1); assertEquals(fileSplit.getPath(0).getName(), file1.getName()); assertEquals(fileSplit.getOffset(0), 0); assertEquals(fileSplit.getLength(0), BLOCKSIZE); - assertEquals(fileSplit.getLocations()[0], "/r1"); + assertEquals(fileSplit.getLocations()[0], hosts1[0]); // should be on r1 // create file4 on all three racks Path file4 = new Path(dir4 + "/file4"); @@ -229,7 +219,7 @@ public class TestCombineFileInputFormat assertEquals(fileSplit.getPath(2).getName(), file3.getName()); assertEquals(fileSplit.getOffset(2), 2 * BLOCKSIZE); assertEquals(fileSplit.getLength(2), BLOCKSIZE); - assertEquals(fileSplit.getLocations()[0], "/r3"); + assertEquals(fileSplit.getLocations()[0], hosts3[0]); // should be on r3 fileSplit = (CombineFileSplit) splits[1]; assertEquals(fileSplit.getNumPaths(), 2); assertEquals(fileSplit.getLocations().length, 1); @@ -239,14 +229,14 @@ public class TestCombineFileInputFormat assertEquals(fileSplit.getPath(1).getName(), file2.getName()); assertEquals(fileSplit.getOffset(1), BLOCKSIZE); assertEquals(fileSplit.getLength(1), BLOCKSIZE); - assertEquals(fileSplit.getLocations()[0], "/r2"); + assertEquals(fileSplit.getLocations()[0], hosts2[0]); // should be on r2 fileSplit = (CombineFileSplit) splits[2]; assertEquals(fileSplit.getNumPaths(), 1); assertEquals(fileSplit.getLocations().length, 1); assertEquals(fileSplit.getPath(0).getName(), file1.getName()); assertEquals(fileSplit.getOffset(0), 0); assertEquals(fileSplit.getLength(0), BLOCKSIZE); - assertEquals(fileSplit.getLocations()[0], "/r1"); + assertEquals(fileSplit.getLocations()[0], hosts1[0]); // should be on r1 // maximum split size is 2 blocks inFormat = new DummyInputFormat(); @@ -385,7 +375,7 @@ public class TestCombineFileInputFormat assertEquals(fileSplit.getPath(0).getName(), file1.getName()); assertEquals(fileSplit.getOffset(0), 0); assertEquals(fileSplit.getLength(0), BLOCKSIZE); - assertEquals(fileSplit.getLocations()[0], "/r1"); + assertEquals(fileSplit.getLocations()[0], hosts1[0]); // should be on r1 // maximum split size is 7 blocks and min is 3 blocks inFormat = new DummyInputFormat(); @@ -431,15 +421,15 @@ public class TestCombineFileInputFormat fileSplit = (CombineFileSplit) splits[0]; assertEquals(fileSplit.getNumPaths(), 2); assertEquals(fileSplit.getLocations().length, 1); - assertEquals(fileSplit.getLocations()[0], "/r2"); + assertEquals(fileSplit.getLocations()[0], hosts2[0]); // should be on r2 fileSplit = (CombineFileSplit) splits[1]; assertEquals(fileSplit.getNumPaths(), 1); assertEquals(fileSplit.getLocations().length, 1); - assertEquals(fileSplit.getLocations()[0], "/r1"); + assertEquals(fileSplit.getLocations()[0], hosts1[0]); // should be on r1 fileSplit = (CombineFileSplit) splits[2]; assertEquals(fileSplit.getNumPaths(), 6); assertEquals(fileSplit.getLocations().length, 1); - assertEquals(fileSplit.getLocations()[0], "/r3"); + assertEquals(fileSplit.getLocations()[0], hosts3[0]); // should be on r3 } finally { if (dfs != null) { dfs.shutdown(); Modified: hadoop/common/branches/branch-0.20-security-204/src/test/org/apache/hadoop/metrics2/impl/TestMetricsConfig.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-204/src/test/org/apache/hadoop/metrics2/impl/TestMetricsConfig.java?rev=1101640&r1=1101639&r2=1101640&view=diff ============================================================================== --- hadoop/common/branches/branch-0.20-security-204/src/test/org/apache/hadoop/metrics2/impl/TestMetricsConfig.java (original) +++ hadoop/common/branches/branch-0.20-security-204/src/test/org/apache/hadoop/metrics2/impl/TestMetricsConfig.java Tue May 10 20:52:35 2011 @@ -110,21 +110,6 @@ public class TestMetricsConfig { } /** - * Should throw if missing config files - */ - @Test public void testMissingFiles() { - try { - MetricsConfig.create("JobTracker"); - } - catch (MetricsConfigException e) { - assertTrue("expected the 'cannot locate configuration' exception", - e.getMessage().startsWith("Cannot locate configuration")); - return; - } - fail("should've thrown"); - } - - /** * Test the config file load order * @throws Exception */
