Author: cutting Date: Fri Sep 7 15:54:09 2007 New Revision: 573744 URL: http://svn.apache.org/viewvc?rev=573744&view=rev Log: HADOOP-1853. Fix contrib/streaming to accept multiple -cacheFile options. Contributed by Prachi Gupta.
Added: lucene/hadoop/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestMultipleCachefiles.java Modified: lucene/hadoop/trunk/CHANGES.txt lucene/hadoop/trunk/build.xml lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamJob.java Modified: lucene/hadoop/trunk/CHANGES.txt URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/CHANGES.txt?rev=573744&r1=573743&r2=573744&view=diff ============================================================================== --- lucene/hadoop/trunk/CHANGES.txt (original) +++ lucene/hadoop/trunk/CHANGES.txt Fri Sep 7 15:54:09 2007 @@ -105,6 +105,9 @@ length, so that it is not always zero in map tasks. (Thomas Friol via cutting) + HADOOP-1853. Fix contrib/streaming to accept multiple -cacheFile + options. (Prachi Gupta via cutting) + IMPROVEMENTS HADOOP-1779. Replace INodeDirectory.getINode() by a getExistingPathINodes() Modified: lucene/hadoop/trunk/build.xml URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/build.xml?rev=573744&r1=573743&r2=573744&view=diff ============================================================================== --- lucene/hadoop/trunk/build.xml (original) +++ lucene/hadoop/trunk/build.xml Fri Sep 7 15:54:09 2007 @@ -492,6 +492,7 @@ <target name="test-contrib" depends="compile-core, compile-core-test"> <subant target="test"> + <property name="version" value="${version}"/> <fileset file="${contrib.dir}/build.xml"/> </subant> </target> @@ -746,6 +747,7 @@ <!-- ================================================================== --> <target name="deploy-contrib" depends="compile-core"> <subant target="deploy"> + <property name="version" value="${version}"/> <fileset file="src/contrib/build.xml"/> </subant> </target> Modified: lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamJob.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamJob.java?rev=573744&r1=573743&r2=573744&view=diff ============================================================================== --- lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamJob.java (original) +++ lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamJob.java Fri Sep 7 15:54:09 2007 @@ -396,7 +396,7 @@ Option inputreader = createOption("inputreader", "Optional.", "spec", 1, false); Option cacheFile = createOption("cacheFile", - "File name URI", "fileNameURI", 1, false); + "File name URI", "fileNameURI", Integer.MAX_VALUE, false); Option cacheArchive = createOption("cacheArchive", "File name URI", "fileNameURI", 1, false); Added: lucene/hadoop/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestMultipleCachefiles.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestMultipleCachefiles.java?rev=573744&view=auto ============================================================================== --- lucene/hadoop/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestMultipleCachefiles.java (added) +++ lucene/hadoop/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestMultipleCachefiles.java Fri Sep 7 15:54:09 2007 @@ -0,0 +1,138 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.streaming; + +import junit.framework.TestCase; +import java.io.*; +import java.util.*; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.mapred.*; +import org.apache.hadoop.dfs.MiniDFSCluster; +/** + * This test case tests the symlink creation + * utility provided by distributed caching + */ +public class TestMultipleCachefiles extends TestCase +{ + String INPUT_FILE = "/testing-streaming/input.txt"; + String OUTPUT_DIR = "/testing-streaming/out"; + String CACHE_FILE = "/testing-streaming/cache.txt"; + String CACHE_FILE_2 = "/testing-streaming/cache2.txt"; + String input = "check to see if we can read this none reduce"; + String map = "xargs cat "; + String reduce = "cat"; + String mapString = "testlink"; + String mapString2 = "testlink2"; + String cacheString = "This is just the cache string"; + String cacheString2 = "This is just the second cache string"; + StreamJob job; + + public TestMultipleCachefiles() throws IOException + { + } + + public void testMultipleCachefiles() + { + try { + boolean mayExit = false; + MiniMRCluster mr = null; + MiniDFSCluster dfs = null; + FileSystem fileSys = null; + try{ + Configuration conf = new Configuration(); + dfs = new MiniDFSCluster(conf, 1, true, null); + fileSys = dfs.getFileSystem(); + String namenode = fileSys.getName(); + mr = new MiniMRCluster(1, namenode, 3); + // During tests, the default Configuration will use a local mapred + // So don't specify -config or -cluster + String strJobtracker = "mapred.job.tracker=" + "localhost:" + mr.getJobTrackerPort(); + String strNamenode = "fs.default.name=" + namenode; + String argv[] = new String[] { + "-input", INPUT_FILE, + "-output", OUTPUT_DIR, + "-mapper", map, + "-reducer", reduce, + //"-verbose", + //"-jobconf", "stream.debug=set" + "-jobconf", strNamenode, + "-jobconf", strJobtracker, + "-jobconf", "stream.tmpdir="+System.getProperty("test.build.data","/tmp"), + "-jobconf", "mapred.child.java.opts=-Dcontrib.name=" + System.getProperty("contrib.name") + " " + + "-Dbuild.test=" + System.getProperty("build.test") + " " + + conf.get("mapred.child.java.opts",""), + "-cacheFile", "hdfs://"+fileSys.getName()+CACHE_FILE + "#" + mapString, + "-cacheFile", "hdfs://"+fileSys.getName()+CACHE_FILE_2 + "#" + mapString2 + }; + + fileSys.delete(new Path(OUTPUT_DIR)); + + DataOutputStream file = fileSys.create(new Path(INPUT_FILE)); + file.writeBytes(mapString + "\n"); + file.writeBytes(mapString2 + "\n"); + file.close(); + file = fileSys.create(new Path(CACHE_FILE)); + file.writeBytes(cacheString); + file.close(); + file = fileSys.create(new Path(CACHE_FILE_2)); + file.writeBytes(cacheString2); + file.close(); + + job = new StreamJob(argv, mayExit); + job.go(); + String line = null; + String line2 = null; + Path[] fileList = fileSys.listPaths(new Path(OUTPUT_DIR)); + for (int i = 0; i < fileList.length; i++){ + System.out.println(fileList[i].toString()); + BufferedReader bread = + new BufferedReader(new InputStreamReader(fileSys.open(fileList[i]))); + line = bread.readLine(); + System.out.println(line); + line2 = bread.readLine(); + System.out.println(line2); + } + assertEquals(cacheString + "\t", line); + assertEquals(cacheString2 + "\t", line2); + } finally{ + if (fileSys != null) { fileSys.close(); } + if (dfs != null) { dfs.shutdown(); } + if (mr != null) { mr.shutdown();} + } + + } catch(Exception e) { + failTrace(e); + } + } + + void failTrace(Exception e) + { + StringWriter sw = new StringWriter(); + e.printStackTrace(new PrintWriter(sw)); + fail(sw.toString()); + } + + public static void main(String[]args) throws Exception + { + new TestMultipleCachefiles().testMultipleCachefiles(); + } + +}