Added: hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/filecache/TestMRWithDistributedCache.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/filecache/TestMRWithDistributedCache.java?rev=1077114&view=auto ============================================================================== --- hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/filecache/TestMRWithDistributedCache.java (added) +++ hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/filecache/TestMRWithDistributedCache.java Fri Mar 4 03:42:25 2011 @@ -0,0 +1,195 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.filecache; + +import java.io.File; +import java.io.FileNotFoundException; +import java.io.FileOutputStream; +import java.io.IOException; +import java.net.URI; +import java.net.URISyntaxException; +import java.util.jar.JarOutputStream; +import java.util.zip.ZipEntry; + +import junit.framework.TestCase; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.LocalFileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.MiniMRCluster; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.Mapper; +import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; +import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat; + +/** + * Tests the use of the + * {@link org.apache.hadoop.mapreduce.filecache.DistributedCache} within the + * full MR flow as well as the LocalJobRunner. This ought to be part of the + * filecache package, but that package is not currently in mapred, so cannot + * depend on MR for testing. + * + * We use the distributed.* namespace for temporary files. + * + * See {@link TestMiniMRLocalFS}, {@link TestMiniMRDFSCaching}, and + * {@link MRCaching} for other tests that test the distributed cache. + * + * This test is not fast: it uses MiniMRCluster. + */ +public class TestMRWithDistributedCache extends TestCase { + private static Path TEST_ROOT_DIR = + new Path(System.getProperty("test.build.data","/tmp")); + private static Configuration conf = new Configuration(); + private static FileSystem localFs; + static { + try { + localFs = FileSystem.getLocal(conf); + } catch (IOException io) { + throw new RuntimeException("problem getting local fs", io); + } + } + + private static final Log LOG = + LogFactory.getLog(TestMRWithDistributedCache.class); + + public static class DistributedCacheChecker extends + Mapper<LongWritable, Text, NullWritable, NullWritable> { + + @Override + public void setup(Context context) throws IOException { + Configuration conf = context.getConfiguration(); + Path[] files = DistributedCache.getLocalCacheFiles(conf); + Path[] archives = DistributedCache.getLocalCacheArchives(conf); + FileSystem fs = LocalFileSystem.get(conf); + + // Check that 2 files and 2 archives are present + TestCase.assertEquals(2, files.length); + TestCase.assertEquals(2, archives.length); + + // Check lengths of the files + TestCase.assertEquals(1, fs.getFileStatus(files[0]).getLen()); + TestCase.assertTrue(fs.getFileStatus(files[1]).getLen() > 1); + + // Check extraction of the archive + TestCase.assertTrue(fs.exists(new Path(archives[0], + "distributed.jar.inside3"))); + TestCase.assertTrue(fs.exists(new Path(archives[1], + "distributed.jar.inside4"))); + + // Check the class loaders + LOG.info("Java Classpath: " + System.getProperty("java.class.path")); + ClassLoader cl = Thread.currentThread().getContextClassLoader(); + // Both the file and the archive were added to classpath, so both + // should be reachable via the class loader. + TestCase.assertNotNull(cl.getResource("distributed.jar.inside2")); + TestCase.assertNotNull(cl.getResource("distributed.jar.inside3")); + TestCase.assertNull(cl.getResource("distributed.jar.inside4")); + + + // Check that the symlink for the renaming was created in the cwd; + // This only happens for real for non-local jobtrackers. + // (The symlinks exist in "localRunner/" for local Jobtrackers, + // but the user has no way to get at them. + if (!"local".equals( + context.getConfiguration().get("mapred.job.tracker"))) { + File symlinkFile = new File("distributed.first.symlink"); + TestCase.assertTrue(symlinkFile.exists()); + TestCase.assertEquals(1, symlinkFile.length()); + } + } + } + + private void testWithConf(JobConf conf) throws IOException, + InterruptedException, ClassNotFoundException, URISyntaxException { + // Create a temporary file of length 1. + Path first = createTempFile("distributed.first", "x"); + // Create two jars with a single file inside them. + Path second = + makeJar(new Path(TEST_ROOT_DIR, "distributed.second.jar"), 2); + Path third = + makeJar(new Path(TEST_ROOT_DIR, "distributed.third.jar"), 3); + Path fourth = + makeJar(new Path(TEST_ROOT_DIR, "distributed.fourth.jar"), 4); + + // Creates the Job Configuration + DistributedCache.addCacheFile( + new URI(first.toUri().toString() + "#distributed.first.symlink"), + conf); + DistributedCache.addFileToClassPath(second, conf); + DistributedCache.addArchiveToClassPath(third, conf); + DistributedCache.addCacheArchive(fourth.toUri(), conf); + DistributedCache.createSymlink(conf); + + conf.setMaxMapAttempts(1); // speed up failures + Job job = new Job(conf); + job.setMapperClass(DistributedCacheChecker.class); + job.setOutputFormatClass(NullOutputFormat.class); + FileInputFormat.setInputPaths(job, first); + + job.submit(); + assertTrue(job.waitForCompletion(false)); + } + + /** Tests using the local job runner. */ + public void testLocalJobRunner() throws Exception { + JobConf c = new JobConf(); + c.set("mapred.job.tracker", "local"); + c.set("fs.default.name", "file:///"); + testWithConf(c); + } + + /** Tests using a full MiniMRCluster. */ + public void testMiniMRJobRunner() throws Exception { + MiniMRCluster m = new MiniMRCluster(1, "file:///", 1); + try { + testWithConf(m.createJobConf()); + } finally { + m.shutdown(); + } + + } + + private Path createTempFile(String filename, String contents) + throws IOException { + Path path = new Path(TEST_ROOT_DIR, filename); + FSDataOutputStream os = localFs.create(path); + os.writeBytes(contents); + os.close(); + return path; + } + + private Path makeJar(Path p, int index) throws FileNotFoundException, + IOException { + FileOutputStream fos = new FileOutputStream(new File(p.toString())); + JarOutputStream jos = new JarOutputStream(fos); + ZipEntry ze = new ZipEntry("distributed.jar.inside" + index); + jos.putNextEntry(ze); + jos.write(("inside the jar!" + index).getBytes()); + jos.closeEntry(); + jos.close(); + return p; + } +}
Added: hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/filecache/TestTrackerDistributedCacheManager.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/filecache/TestTrackerDistributedCacheManager.java?rev=1077114&view=auto ============================================================================== --- hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/filecache/TestTrackerDistributedCacheManager.java (added) +++ hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/filecache/TestTrackerDistributedCacheManager.java Fri Mar 4 03:42:25 2011 @@ -0,0 +1,166 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.filecache; + +import java.io.File; +import java.io.FileNotFoundException; +import java.io.FileOutputStream; +import java.io.IOException; +import java.util.Random; + +import junit.framework.TestCase; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.LocalDirAllocator; +import org.apache.hadoop.fs.Path; + +public class TestTrackerDistributedCacheManager extends TestCase { + private static final String TEST_LOCAL_DIR_PROP = "test.local.dir"; + private static String TEST_CACHE_BASE_DIR = + new Path(System.getProperty("test.build.data","/tmp/cachebasedir")) + .toString().replace(' ', '+'); + private static String TEST_ROOT_DIR = + System.getProperty("test.build.data", "/tmp/distributedcache"); + private static final int TEST_FILE_SIZE = 4 * 1024; // 4K + private static final int LOCAL_CACHE_LIMIT = 5 * 1024; //5K + private Configuration conf; + private Path firstCacheFile; + private Path secondCacheFile; + + @Override + protected void setUp() throws IOException { + conf = new Configuration(); + conf.setLong("local.cache.size", LOCAL_CACHE_LIMIT); + conf.set(TEST_LOCAL_DIR_PROP, TEST_ROOT_DIR); + conf.set(FileSystem.FS_DEFAULT_NAME_KEY, "file:///"); + firstCacheFile = new Path(TEST_ROOT_DIR, "firstcachefile"); + secondCacheFile = new Path(TEST_ROOT_DIR, "secondcachefile"); + createTempFile(firstCacheFile); + createTempFile(secondCacheFile); + } + + /** + * This is the typical flow for using the DistributedCache classes. + */ + public void testManagerFlow() throws IOException { + TrackerDistributedCacheManager manager = + new TrackerDistributedCacheManager(conf); + LocalDirAllocator localDirAllocator = + new LocalDirAllocator(TEST_LOCAL_DIR_PROP); + + // Configures a task/job with both a regular file and a "classpath" file. + Configuration subConf = new Configuration(conf); + DistributedCache.addCacheFile(firstCacheFile.toUri(), subConf); + DistributedCache.addFileToClassPath(secondCacheFile, subConf); + TrackerDistributedCacheManager.determineTimestamps(subConf); + + Path jobFile = new Path(TEST_ROOT_DIR, "job.xml"); + FileOutputStream os = new FileOutputStream(new File(jobFile.toString())); + subConf.writeXml(os); + os.close(); + + TaskDistributedCacheManager handle = + manager.newTaskDistributedCacheManager(subConf); + assertNull(null, DistributedCache.getLocalCacheFiles(subConf)); + handle.setup(localDirAllocator, + new File(new Path(TEST_ROOT_DIR, "workdir").toString()), "distcache"); + Path[] localCacheFiles = DistributedCache.getLocalCacheFiles(subConf); + assertNotNull(null, localCacheFiles); + assertEquals(2, localCacheFiles.length); + Path cachedFirstFile = localCacheFiles[0]; + Path cachedSecondFile = localCacheFiles[1]; + assertFileLengthEquals(firstCacheFile, cachedFirstFile); + assertFalse("Paths should be different.", + firstCacheFile.equals(cachedFirstFile)); + + assertEquals(1, handle.getClassPaths().size()); + assertEquals(cachedSecondFile.toString(), handle.getClassPaths().get(0)); + + // Cleanup + handle.release(); + manager.purgeCache(); + assertFalse(pathToFile(cachedFirstFile).exists()); + } + + + /** test delete cache */ + public void testDeleteCache() throws Exception { + TrackerDistributedCacheManager manager = + new TrackerDistributedCacheManager(conf); + FileSystem localfs = FileSystem.getLocal(conf); + + manager.getLocalCache(firstCacheFile.toUri(), conf, + new Path(TEST_CACHE_BASE_DIR), null, false, + System.currentTimeMillis(), new Path(TEST_ROOT_DIR), false); + manager.releaseCache(firstCacheFile.toUri(), conf); + //in above code,localized a file of size 4K and then release the cache + // which will cause the cache be deleted when the limit goes out. + // The below code localize another cache which's designed to + //sweep away the first cache. + manager.getLocalCache(secondCacheFile.toUri(), conf, + new Path(TEST_CACHE_BASE_DIR), null, false, + System.currentTimeMillis(), new Path(TEST_ROOT_DIR), false); + FileStatus[] dirStatuses = localfs.listStatus( + new Path(TEST_CACHE_BASE_DIR)); + assertTrue("DistributedCache failed deleting old" + + " cache when the cache store is full.", + dirStatuses.length > 1); + } + + public void testFileSystemOtherThanDefault() throws Exception { + TrackerDistributedCacheManager manager = + new TrackerDistributedCacheManager(conf); + conf.set("fs.fakefile.impl", conf.get("fs.file.impl")); + Path fileToCache = new Path("fakefile:///" + + firstCacheFile.toUri().getPath()); + Path result = manager.getLocalCache(fileToCache.toUri(), conf, + new Path(TEST_CACHE_BASE_DIR), null, false, System.currentTimeMillis(), + new Path(TEST_ROOT_DIR), false); + assertNotNull("DistributedCache cached file on non-default filesystem.", + result); + } + + static void createTempFile(Path p) throws IOException { + File f = new File(p.toString()); + FileOutputStream os = new FileOutputStream(f); + byte[] toWrite = new byte[TEST_FILE_SIZE]; + new Random().nextBytes(toWrite); + os.write(toWrite); + os.close(); + FileSystem.LOG.info("created: " + p + ", size=" + TEST_FILE_SIZE); + } + + @Override + protected void tearDown() throws IOException { + new File(firstCacheFile.toString()).delete(); + new File(secondCacheFile.toString()).delete(); + } + + private void assertFileLengthEquals(Path a, Path b) + throws FileNotFoundException { + assertEquals("File sizes mismatch.", + pathToFile(a).length(), pathToFile(b).length()); + } + + private File pathToFile(Path p) { + return new File(p.toString()); + } +}
