olga
Thu, 08 May 2008 19:40:37 -0700
Author: olga Date: Thu May 8 19:40:10 2008 New Revision: 654669 URL: http://svn.apache.org/viewvc?rev=654669&view=rev Log: PIG-232: let valid cache statements in Modified: incubator/pig/trunk/CHANGES.txt incubator/pig/trunk/src/org/apache/pig/impl/io/FileLocalizer.java incubator/pig/trunk/src/org/apache/pig/impl/streaming/StreamingCommand.java incubator/pig/trunk/test/org/apache/pig/test/TestStreaming.java Modified: incubator/pig/trunk/CHANGES.txt URL: http://svn.apache.org/viewvc/incubator/pig/trunk/CHANGES.txt?rev=654669&r1=654668&r2=654669&view=diff ============================================================================== --- incubator/pig/trunk/CHANGES.txt (original) +++ incubator/pig/trunk/CHANGES.txt Thu May 8 19:40:10 2008 @@ -289,3 +289,5 @@ PIG-232: fix for number of input records when BinaryStirage is used (acmurthy via olgan) + + PIG-232: let valid cache specifications through (acmurthy via olgan) Modified: incubator/pig/trunk/src/org/apache/pig/impl/io/FileLocalizer.java URL: http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/impl/io/FileLocalizer.java?rev=654669&r1=654668&r2=654669&view=diff ============================================================================== --- incubator/pig/trunk/src/org/apache/pig/impl/io/FileLocalizer.java (original) +++ incubator/pig/trunk/src/org/apache/pig/impl/io/FileLocalizer.java Thu May 8 19:40:10 2008 @@ -344,6 +344,27 @@ return elem.exists() || globMatchesFiles(elem, store); } + public static boolean isFile(String filename, PigContext context) + throws IOException { + return !isDirectory(filename, context.getDfs()); + } + + public static boolean isFile(String filename, DataStorage store) + throws IOException { + return !isDirectory(filename, store); + } + + public static boolean isDirectory(String filename, PigContext context) + throws IOException { + return isDirectory(filename, context.getDfs()); + } + + public static boolean isDirectory(String filename, DataStorage store) + throws IOException { + ElementDescriptor elem = store.asElement(filename); + return (elem instanceof ContainerDescriptor); + } + private static boolean globMatchesFiles(ElementDescriptor elem, DataStorage fs) throws IOException Modified: incubator/pig/trunk/src/org/apache/pig/impl/streaming/StreamingCommand.java URL: http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/impl/streaming/StreamingCommand.java?rev=654669&r1=654668&r2=654669&view=diff ============================================================================== --- incubator/pig/trunk/src/org/apache/pig/impl/streaming/StreamingCommand.java (original) +++ incubator/pig/trunk/src/org/apache/pig/impl/streaming/StreamingCommand.java Thu May 8 19:40:10 2008 @@ -3,6 +3,8 @@ import java.io.File; import java.io.IOException; import java.io.Serializable; +import java.net.URI; +import java.net.URISyntaxException; import java.util.LinkedList; import java.util.List; import java.util.Map; @@ -142,7 +144,11 @@ // Validate File file = new File(path); if (!file.exists()) { - throw new IOException("Invalid ship specification: " + path); + throw new IOException("Invalid ship specification: '" + path + + "' does not exist!"); + } else if (file.isDirectory()) { + throw new IOException("Invalid ship specification: '" + path + + "' is a directory and can't be shipped!"); } shipSpec.add(path); @@ -156,10 +162,35 @@ */ public void addPathToCache(String path) throws IOException { // Validate - if (!FileLocalizer.fileExists(path, pigContext)) { + URI pathUri = null; + URI dfsPath = null; + try { + pathUri = new URI(path); + + // Strip away the URI's _fragment_ and _query_ + dfsPath = new URI(pathUri.getScheme(), pathUri.getAuthority(), + pathUri.getPath(), null, null); + } catch (URISyntaxException urise) { throw new IOException("Invalid cache specification: " + path); } + boolean exists = false; + try { + exists = FileLocalizer.fileExists(dfsPath.toString(), pigContext); + } catch (IOException ioe) { + // Throw a better error message... + throw new IOException("Invalid cache specification: '" + dfsPath + + "' does not exist!"); + } + + if (!exists) { + throw new IOException("Invalid cache specification: '" + dfsPath + + "' does not exist!"); + } else if (FileLocalizer.isDirectory(dfsPath.toString(), pigContext)) { + throw new IOException("Invalid cache specification: '" + dfsPath + + "' is a directory and can't be cached!"); + } + cacheSpec.add(path); } Modified: incubator/pig/trunk/test/org/apache/pig/test/TestStreaming.java URL: http://svn.apache.org/viewvc/incubator/pig/trunk/test/org/apache/pig/test/TestStreaming.java?rev=654669&r1=654668&r2=654669&view=diff ============================================================================== --- incubator/pig/trunk/test/org/apache/pig/test/TestStreaming.java (original) +++ incubator/pig/trunk/test/org/apache/pig/test/TestStreaming.java Thu May 8 19:40:10 2008 @@ -248,6 +248,79 @@ } @Test + public void testInputCacheSpecs() throws Exception { + // Can't run this without HDFS + if(execType == ExecType.LOCAL) + return; + + File input = Util.createInputFile("tmp", "", + new String[] {"A,1", "B,2", "C,3", + "D,2", "A,5", "B,5", + "C,8", "A,8", "D,8", + "A,9"}); + + // Perl script + String[] script = + new String[] { + "#!/usr/bin/perl", + "open(INFILE, $ARGV[0]) or die \"Can't open \".$ARGV[0].\"!: $!\";", + "while (<INFILE>) {", + " chomp $_;", + " print STDOUT \"$_\n\";", + " print STDERR \"STDERR: $_\n\";", + "}", + }; + // Copy the scripts to HDFS + File command1 = Util.createInputFile("script", "pl", script); + File command2 = Util.createInputFile("script", "pl", script); + String c1 = FileLocalizer.hadoopify(command1.toString(), + pigServer.getPigContext()); + String c2 = FileLocalizer.hadoopify(command2.toString(), + pigServer.getPigContext()); + + // Expected results + String[] expectedFirstFields = + new String[] {"A", "B", "C", "A", "D", "A"}; + int[] expectedSecondFields = new int[] {5, 5, 8, 8, 8, 9}; + Tuple[] expectedResults = + setupExpectedResults(expectedFirstFields, expectedSecondFields); + + // Pig query to run + pigServer.registerQuery( + "define CMD1 `script1.pl foo` " + + "cache ('" + c1 + "#script1.pl') " + + "input('foo' using " + PigStorage.class.getName() + "(',')) " + + "stderr();"); + pigServer.registerQuery( + "define CMD2 `script2.pl bar` " + + "cache ('" + c2 + "#script2.pl') " + + "input('bar' using " + PigStorage.class.getName() + "(',')) " + + "stderr();"); + pigServer.registerQuery("IP = load 'file:" + input + "' using " + + PigStorage.class.getName() + "(',');"); + pigServer.registerQuery("FILTERED_DATA = filter IP by $1 > '3';"); + pigServer.registerQuery("STREAMED_DATA = stream FILTERED_DATA " + + "through CMD1;"); + pigServer.registerQuery("OP = stream STREAMED_DATA through CMD2;"); + + String output = "/pig/out"; + pigServer.deleteFile(output); + pigServer.store("OP", output, PigStorage.class.getName() + "(',')"); + + InputStream op = FileLocalizer.open(output, pigServer.getPigContext()); + PigStorage ps = new PigStorage(","); + ps.bindTo("", new BufferedPositionedInputStream(op), 0, Long.MAX_VALUE); + List<Tuple> outputs = new ArrayList<Tuple>(); + Tuple t; + while ((t = ps.getNext()) != null) { + outputs.add(t); + } + + // Run the query and check the results + Util.checkQueryOutputs(outputs.iterator(), expectedResults); + } + + @Test public void testOutputShipSpecs() throws Exception { // FIXME : this should be tested in all modes if(execType == ExecType.LOCAL)