Author: cdouglas
Date: Tue Dec 16 02:51:36 2008
New Revision: 727016
URL: http://svn.apache.org/viewvc?rev=727016&view=rev
Log:
HADOOP-4458. Add a test creating symlinks in the working directory. Contributed
by Amareshwari Sriramadasu.
Modified:
hadoop/core/branches/branch-0.20/CHANGES.txt
hadoop/core/branches/branch-0.20/src/test/org/apache/hadoop/mapred/MRCaching.java
hadoop/core/branches/branch-0.20/src/test/org/apache/hadoop/mapred/TestMiniMRDFSCaching.java
Modified: hadoop/core/branches/branch-0.20/CHANGES.txt
URL:
http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.20/CHANGES.txt?rev=727016&r1=727015&r2=727016&view=diff
==============================================================================
--- hadoop/core/branches/branch-0.20/CHANGES.txt (original)
+++ hadoop/core/branches/branch-0.20/CHANGES.txt Tue Dec 16 02:51:36 2008
@@ -431,6 +431,9 @@
HADOOP-4845. Modify the reduce input byte counter to record only the
compressed size and add a human-readable label. (Yongqiang He via cdouglas)
+ HADOOP-4458. Add a test creating symlinks in the working directory.
+ (Amareshwari Sriramadasu via cdouglas)
+
Release 0.19.1 - Unreleased
IMPROVEMENTS
Modified:
hadoop/core/branches/branch-0.20/src/test/org/apache/hadoop/mapred/MRCaching.java
URL:
http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.20/src/test/org/apache/hadoop/mapred/MRCaching.java?rev=727016&r1=727015&r2=727016&view=diff
==============================================================================
---
hadoop/core/branches/branch-0.20/src/test/org/apache/hadoop/mapred/MRCaching.java
(original)
+++
hadoop/core/branches/branch-0.20/src/test/org/apache/hadoop/mapred/MRCaching.java
Tue Dec 16 02:51:36 2008
@@ -110,6 +110,55 @@
}
/**
+ * Using the wordcount example and adding caching to it. The cache
+ * archives/files are set and then are checked in the map if they have been
+ * symlinked or not.
+ */
+ public static class MapClass2 extends MapClass {
+
+ JobConf conf;
+
+ public void configure(JobConf jconf) {
+ conf = jconf;
+ try {
+ // read the cached files (unzipped, unjarred and text)
+ // and put it into a single file TEST_ROOT_DIR/test.txt
+ String TEST_ROOT_DIR = jconf.get("test.build.data","/tmp");
+ Path file = new Path("file:///", TEST_ROOT_DIR);
+ FileSystem fs = FileSystem.getLocal(conf);
+ if (!fs.mkdirs(file)) {
+ throw new IOException("Mkdirs failed to create " + file.toString());
+ }
+ Path fileOut = new Path(file, "test.txt");
+ fs.delete(fileOut, true);
+ DataOutputStream out = fs.create(fileOut);
+ String[] symlinks = new String[6];
+ symlinks[0] = ".";
+ symlinks[1] = "testjar";
+ symlinks[2] = "testzip";
+ symlinks[3] = "testtgz";
+ symlinks[4] = "testtargz";
+ symlinks[5] = "testtar";
+
+ for (int i = 0; i < symlinks.length; i++) {
+ // read out the files from these archives
+ File f = new File(symlinks[i]);
+ File txt = new File(f, "test.txt");
+ FileInputStream fin = new FileInputStream(txt);
+ BufferedReader reader = new BufferedReader(new
InputStreamReader(fin));
+ String str = reader.readLine();
+ reader.close();
+ out.writeBytes(str);
+ out.writeBytes("\n");
+ }
+ out.close();
+ } catch (IOException ie) {
+ System.out.println(StringUtils.stringifyException(ie));
+ }
+ }
+ }
+
+ /**
* A reducer class that just emits the sum of the input values.
*/
public static class ReduceClass extends MapReduceBase
@@ -135,9 +184,40 @@
}
}
+ static void setupCache(String cacheDir, FileSystem fs)
+ throws IOException {
+ Path localPath = new Path("build/test/cache");
+ Path txtPath = new Path(localPath, new Path("test.txt"));
+ Path jarPath = new Path(localPath, new Path("test.jar"));
+ Path zipPath = new Path(localPath, new Path("test.zip"));
+ Path tarPath = new Path(localPath, new Path("test.tgz"));
+ Path tarPath1 = new Path(localPath, new Path("test.tar.gz"));
+ Path tarPath2 = new Path(localPath, new Path("test.tar"));
+ Path cachePath = new Path(cacheDir);
+ fs.delete(cachePath, true);
+ if (!fs.mkdirs(cachePath)) {
+ throw new IOException("Mkdirs failed to create " + cachePath.toString());
+ }
+ fs.copyFromLocalFile(txtPath, cachePath);
+ fs.copyFromLocalFile(jarPath, cachePath);
+ fs.copyFromLocalFile(zipPath, cachePath);
+ fs.copyFromLocalFile(tarPath, cachePath);
+ fs.copyFromLocalFile(tarPath1, cachePath);
+ fs.copyFromLocalFile(tarPath2, cachePath);
+ }
+
+ public static TestResult launchMRCache(String indir,
+ String outdir, String cacheDir,
+ JobConf conf, String input)
+ throws IOException {
+ setupCache(cacheDir, FileSystem.get(conf));
+ return launchMRCache(indir,outdir, cacheDir, conf, input, false);
+ }
+
public static TestResult launchMRCache(String indir,
String outdir, String cacheDir,
- JobConf conf, String input)
+ JobConf conf, String input,
+ boolean withSymlink)
throws IOException {
String TEST_ROOT_DIR = new
Path(System.getProperty("test.build.data","/tmp"))
.toString().replace(' ', '+');
@@ -163,7 +243,6 @@
// the values are counts (ints)
conf.setOutputValueClass(IntWritable.class);
- conf.setMapperClass(MRCaching.MapClass.class);
conf.setCombinerClass(MRCaching.ReduceClass.class);
conf.setReducerClass(MRCaching.ReduceClass.class);
FileInputFormat.setInputPaths(conf, inDir);
@@ -171,38 +250,29 @@
conf.setNumMapTasks(1);
conf.setNumReduceTasks(1);
conf.setSpeculativeExecution(false);
- Path localPath = new Path("build/test/cache");
- Path txtPath = new Path(localPath, new Path("test.txt"));
- Path jarPath = new Path(localPath, new Path("test.jar"));
- Path zipPath = new Path(localPath, new Path("test.zip"));
- Path tarPath = new Path(localPath, new Path("test.tgz"));
- Path tarPath1 = new Path(localPath, new Path("test.tar.gz"));
- Path tarPath2 = new Path(localPath, new Path("test.tar"));
- Path cachePath = new Path(cacheDir);
- fs.delete(cachePath, true);
- if (!fs.mkdirs(cachePath)) {
- throw new IOException("Mkdirs failed to create " + cachePath.toString());
+ URI[] uris = new URI[6];
+ if (!withSymlink) {
+ conf.setMapperClass(MRCaching.MapClass.class);
+ uris[0] = fs.getUri().resolve(cacheDir + "/test.txt");
+ uris[1] = fs.getUri().resolve(cacheDir + "/test.jar");
+ uris[2] = fs.getUri().resolve(cacheDir + "/test.zip");
+ uris[3] = fs.getUri().resolve(cacheDir + "/test.tgz");
+ uris[4] = fs.getUri().resolve(cacheDir + "/test.tar.gz");
+ uris[5] = fs.getUri().resolve(cacheDir + "/test.tar");
+ } else {
+ DistributedCache.createSymlink(conf);
+ conf.setMapperClass(MRCaching.MapClass2.class);
+ uris[0] = fs.getUri().resolve(cacheDir + "/test.txt#" + "test.txt");
+ uris[1] = fs.getUri().resolve(cacheDir + "/test.jar#" + "testjar");
+ uris[2] = fs.getUri().resolve(cacheDir + "/test.zip#" + "testzip");
+ uris[3] = fs.getUri().resolve(cacheDir + "/test.tgz#" + "testtgz");
+ uris[4] = fs.getUri().resolve(cacheDir + "/test.tar.gz#" + "testtargz");
+ uris[5] = fs.getUri().resolve(cacheDir + "/test.tar#" + "testtar");
+ }
+ DistributedCache.addCacheFile(uris[0], conf);
+ for (int i = 1; i < 6; i++) {
+ DistributedCache.addCacheArchive(uris[i], conf);
}
- fs.copyFromLocalFile(txtPath, cachePath);
- fs.copyFromLocalFile(jarPath, cachePath);
- fs.copyFromLocalFile(zipPath, cachePath);
- fs.copyFromLocalFile(tarPath, cachePath);
- fs.copyFromLocalFile(tarPath1, cachePath);
- fs.copyFromLocalFile(tarPath2, cachePath);
- // setting the cached archives to zip, jar and simple text files
- URI uri1 = fs.getUri().resolve(cachePath + "/test.jar");
- URI uri2 = fs.getUri().resolve(cachePath + "/test.zip");
- URI uri3 = fs.getUri().resolve(cachePath + "/test.txt");
- URI uri4 = fs.getUri().resolve(cachePath + "/test.tgz");
- URI uri5 = fs.getUri().resolve(cachePath + "/test.tar.gz");
- URI uri6 = fs.getUri().resolve(cachePath + "/test.tar");
-
- DistributedCache.addCacheArchive(uri1, conf);
- DistributedCache.addCacheArchive(uri2, conf);
- DistributedCache.addCacheFile(uri3, conf);
- DistributedCache.addCacheArchive(uri4, conf);
- DistributedCache.addCacheArchive(uri5, conf);
- DistributedCache.addCacheArchive(uri6, conf);
RunningJob job = JobClient.runJob(conf);
int count = 0;
// after the job ran check to see if the input from the localized cache
Modified:
hadoop/core/branches/branch-0.20/src/test/org/apache/hadoop/mapred/TestMiniMRDFSCaching.java
URL:
http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.20/src/test/org/apache/hadoop/mapred/TestMiniMRDFSCaching.java?rev=727016&r1=727015&r2=727016&view=diff
==============================================================================
---
hadoop/core/branches/branch-0.20/src/test/org/apache/hadoop/mapred/TestMiniMRDFSCaching.java
(original)
+++
hadoop/core/branches/branch-0.20/src/test/org/apache/hadoop/mapred/TestMiniMRDFSCaching.java
Tue Dec 16 02:51:36 2008
@@ -41,13 +41,22 @@
dfs = new MiniDFSCluster(conf, 1, true, null);
fileSys = dfs.getFileSystem();
mr = new MiniMRCluster(2, fileSys.getName(), 4);
+ MRCaching.setupCache("/cachedir", fileSys);
// run the wordcount example with caching
TestResult ret = MRCaching.launchMRCache("/testing/wc/input",
"/testing/wc/output",
"/cachedir",
mr.createJobConf(),
"The quick brown fox\nhas many
silly\n"
- + "red fox sox\n");
+ + "red fox sox\n", false);
+ assertTrue("Archives not matching", ret.isOutputOk);
+ // launch MR cache with symlinks
+ ret = MRCaching.launchMRCache("/testing/wc/input",
+ "/testing/wc/output",
+ "/cachedir",
+ mr.createJobConf(),
+ "The quick brown fox\nhas many silly\n"
+ + "red fox sox\n", true);
assertTrue("Archives not matching", ret.isOutputOk);
} finally {
if (fileSys != null) {