olga
Mon, 07 Apr 2008 15:57:58 -0700
Author: olga Date: Mon Apr 7 15:57:21 2008 New Revision: 645726 URL: http://svn.apache.org/viewvc?rev=645726&view=rev Log: PIG-182: streaming hang bugfix Modified: incubator/pig/trunk/CHANGES.txt incubator/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapreduceExec/MapReduceLauncher.java incubator/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapreduceExec/PigMapReduce.java incubator/pig/trunk/src/org/apache/pig/impl/eval/collector/DataCollector.java incubator/pig/trunk/src/org/apache/pig/impl/logicalLayer/optimizer/streaming/LoadOptimizer.java incubator/pig/trunk/src/org/apache/pig/impl/logicalLayer/optimizer/streaming/StoreOptimizer.java incubator/pig/trunk/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt incubator/pig/trunk/src/org/apache/pig/impl/streaming/ExecutableManager.java incubator/pig/trunk/src/org/apache/pig/impl/util/JarManager.java incubator/pig/trunk/test/org/apache/pig/test/TestStreaming.java Modified: incubator/pig/trunk/CHANGES.txt URL: http://svn.apache.org/viewvc/incubator/pig/trunk/CHANGES.txt?rev=645726&r1=645725&r2=645726&view=diff ============================================================================== --- incubator/pig/trunk/CHANGES.txt (original) +++ incubator/pig/trunk/CHANGES.txt Mon Apr 7 15:57:21 2008 @@ -203,3 +203,5 @@ PIG-174,180: bug fixes in streaming (arunc via olgan) PIG-181: streaming bug fixing (arunc via olgan) + + PIG-182: streaming bug fix (arunc via olgan) Modified: incubator/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapreduceExec/MapReduceLauncher.java URL: http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapreduceExec/MapReduceLauncher.java?rev=645726&r1=645725&r2=645726&view=diff ============================================================================== --- incubator/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapreduceExec/MapReduceLauncher.java (original) +++ incubator/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapreduceExec/MapReduceLauncher.java Mon Apr 7 15:57:21 2008 @@ -51,7 +51,9 @@ import org.apache.pig.data.BagFactory; import org.apache.pig.data.IndexedTuple; import org.apache.pig.data.Tuple; +import org.apache.pig.impl.PigContext; import org.apache.pig.impl.eval.EvalSpec; +import org.apache.pig.impl.io.FileLocalizer; import org.apache.pig.impl.io.PigFile; import org.apache.pig.impl.util.JarManager; import org.apache.pig.impl.util.ObjectSerializer; @@ -144,24 +146,11 @@ funcs.addAll(pom.toReduce.getFuncs()); } - String shipFiles = - pom.properties.getProperty("pig.streaming.ship.files"); - List<String> files = new ArrayList<String>(); - if (shipFiles != null) { - String[] paths = shipFiles.split(","); - for (String path : paths) { - path = path.trim(); - if (path.length() > 0) { - files.add(path.trim()); - } - } - } - // create jobs.jar locally and pass it to hadoop File submitJarFile = File.createTempFile("Job", ".jar"); try { FileOutputStream fos = new FileOutputStream(submitJarFile); - JarManager.createJar(fos, funcs, files, pom.pigContext); + JarManager.createJar(fos, funcs, null, pom.pigContext); log.debug("Job jar size = " + submitJarFile.length()); conf.setJar(submitJarFile.getPath()); String user = System.getProperty("user.name"); @@ -219,26 +208,10 @@ conf.set("pig.storeFunc", pom.outputFileSpec.getFuncSpec()); // Setup the DistributedCache for this job - DistributedCache.createSymlink(conf); - - String cacheFiles = - pom.properties.getProperty("pig.streaming.cache.files"); - if (cacheFiles != null) { - String[] paths = cacheFiles.split(","); - - for (String path : paths) { - path = path.trim(); - if (path.length() != 0) { - URI uri = null; - try { - uri = new URI(path); - } catch (URISyntaxException ue) { - throw new IOException("Invalid cache specification, file doesn't exist: " + path); - } - DistributedCache.addCacheFile(uri, conf); - } - } - } + setupDistributedCache(pom.pigContext, conf, pom.properties, + "pig.streaming.ship.files", true); + setupDistributedCache(pom.pigContext, conf, pom.properties, + "pig.streaming.cache.files", false); //TODO - Remove this conf.setBoolean("keep.failed.task.files", true); @@ -396,6 +369,58 @@ throws IOException { for (Map.Entry property : pom.properties.entrySet()) { job.set((String)property.getKey(), (String)property.getValue()); + } + } + + private static void setupDistributedCache(PigContext pigContext, + Configuration conf, + Properties properties, String key, + boolean shipToCluster) + throws IOException { + // Turn on the symlink feature + DistributedCache.createSymlink(conf); + + // Set up the DistributedCache for this job + String fileNames = properties.getProperty(key); + if (fileNames != null) { + String[] paths = fileNames.split(","); + + for (String path : paths) { + path = path.trim(); + if (path.length() != 0) { + Path src = new Path(path); + + // Ensure that 'src' is a valid URI + URI srcURI = null; + try { + srcURI = new URI(src.toString()); + } catch (URISyntaxException ue) { + throw new IOException("Invalid cache specification, " + + "file doesn't exist: " + src); + } + + // Ship it to the cluster if necessary and add to the + // DistributedCache + if (shipToCluster) { + Path dst = + new Path(FileLocalizer.getTemporaryPath(null, pigContext).toString()); + FileSystem fs = dst.getFileSystem(conf); + fs.copyFromLocalFile(src, dst); + + // Construct the dst#srcName uri for DistributedCache + URI dstURI = null; + try { + dstURI = new URI(dst.toString() + "#" + src.getName()); + } catch (URISyntaxException ue) { + throw new IOException("Invalid ship specification, " + + "file doesn't exist: " + dst); + } + DistributedCache.addCacheFile(dstURI, conf); + } else { + DistributedCache.addCacheFile(srcURI, conf); + } + } + } } } Modified: incubator/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapreduceExec/PigMapReduce.java URL: http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapreduceExec/PigMapReduce.java?rev=645726&r1=645725&r2=645726&view=diff ============================================================================== --- incubator/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapreduceExec/PigMapReduce.java (original) +++ incubator/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapreduceExec/PigMapReduce.java Mon Apr 7 15:57:21 2008 @@ -162,9 +162,11 @@ evalPipe.add(t); } catch (Throwable tr) { log.error(tr); - RuntimeException exp = new RuntimeException(tr.getMessage()); - exp.setStackTrace(tr.getStackTrace()); - throw exp; + + // Convert to IOException to ensure Hadoop handles it correctly ... + IOException ioe = new IOException(tr.getMessage()); + ioe.setStackTrace(tr.getStackTrace()); + throw ioe; } } @@ -197,8 +199,17 @@ * Nothing happens here. */ public void close() throws IOException { + try { if (evalPipe!=null) evalPipe.finishPipe(); + } catch (Throwable t) { + log.error(t); + + // Convert to IOException to ensure Hadoop handles it correctly ... + IOException ioe = new IOException(t.getMessage()); + ioe.setStackTrace(t.getStackTrace()); + throw ioe; + } } public static PigContext getPigContext() { @@ -308,13 +319,23 @@ } public void closeSideFiles(){ + IOException ioe = null; for (PigRecordWriter writer: sideFileWriters){ try{ writer.close(reporter); }catch(IOException e){ log.error(e); + + // Save the first IOException which occurred ... + if (ioe == null) { + ioe = e; + } } } + + if (ioe != null) { + throw new RuntimeException(ioe); + } } class MapDataOutputCollector extends DataCollector { @@ -340,9 +361,26 @@ @Override public void finish(){ - closeSideFiles(); - if (group != null) - group.finish(); + try { + closeSideFiles(); + + if (group != null) { + group.finish(); + group = null; + } + } catch (Exception e) { + try { + if (group != null) { + group.finish(); + group = null; + } + } catch (Exception innerE) { + log.warn("Failed to cleanup groups with: " + innerE); + } + + // Propagate the original exception + throw new RuntimeException(e); + } } } Modified: incubator/pig/trunk/src/org/apache/pig/impl/eval/collector/DataCollector.java URL: http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/impl/eval/collector/DataCollector.java?rev=645726&r1=645725&r2=645726&view=diff ============================================================================== --- incubator/pig/trunk/src/org/apache/pig/impl/eval/collector/DataCollector.java (original) +++ incubator/pig/trunk/src/org/apache/pig/impl/eval/collector/DataCollector.java Mon Apr 7 15:57:21 2008 @@ -129,17 +129,22 @@ public final void finishPipe() { try { finish(); - } finally { + + if (successor != null) { + successor.finishPipe(); + successor = null; + } + } catch (Exception e) { try { if (successor != null) { successor.finishPipe(); } - } catch (Exception e) { + } catch (Exception ignored) { // Ignore this exception since the original is more relevant - LOG.debug(e); - } finally { - successor = null; + LOG.debug(ignored); } + successor = null; + throw new RuntimeException(e); } } Modified: incubator/pig/trunk/src/org/apache/pig/impl/logicalLayer/optimizer/streaming/LoadOptimizer.java URL: http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/impl/logicalLayer/optimizer/streaming/LoadOptimizer.java?rev=645726&r1=645725&r2=645726&view=diff ============================================================================== --- incubator/pig/trunk/src/org/apache/pig/impl/logicalLayer/optimizer/streaming/LoadOptimizer.java (original) +++ incubator/pig/trunk/src/org/apache/pig/impl/logicalLayer/optimizer/streaming/LoadOptimizer.java Mon Apr 7 15:57:21 2008 @@ -81,8 +81,19 @@ LoadFunc inputLoader = (LoadFunc)PigContext.instantiateFuncFromSpec( loadFileSpec.getFuncSpec()); + + // Check if both LoadFunc objects belong to the same type + boolean sameType = false; + try { + streamLoader.getClass().cast(inputLoader); + sameType = true; + } catch (ClassCastException cce) { + sameType = false; + } - if (streamLoader.equals(inputLoader)) { + // Check if both LoadFunc objects belong to the same type and + // are equivalent + if (sameType && streamLoader.equals(inputLoader)) { // Since they both are the same, we can flip them // for BinaryStorage load.setInputFileSpec(new FileSpec(loadFileSpec.getFileName(), BinaryStorage.class.getName())); Modified: incubator/pig/trunk/src/org/apache/pig/impl/logicalLayer/optimizer/streaming/StoreOptimizer.java URL: http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/impl/logicalLayer/optimizer/streaming/StoreOptimizer.java?rev=645726&r1=645725&r2=645726&view=diff ============================================================================== --- incubator/pig/trunk/src/org/apache/pig/impl/logicalLayer/optimizer/streaming/StoreOptimizer.java (original) +++ incubator/pig/trunk/src/org/apache/pig/impl/logicalLayer/optimizer/streaming/StoreOptimizer.java Mon Apr 7 15:57:21 2008 @@ -107,7 +107,19 @@ StoreFunc outputStorer = (StoreFunc)PigContext.instantiateFuncFromSpec( storeFileSpec.getFuncSpec()); - if (streamStorer.equals(outputStorer)) { + + // Check if both LoadFunc objects belong to the same type + boolean sameType = false; + try { + streamStorer.getClass().cast(outputStorer); + sameType = true; + } catch (ClassCastException cce) { + sameType = false; + } + + // Check if both LoadFunc objects belong to the same type and + // are equivalent + if (sameType && streamStorer.equals(outputStorer)) { // Since they both are the same, we can flip them // for BinaryStorage s.setOutputFileSpec(new FileSpec(storeFileSpec.getFileName(), BinaryStorage.class.getName())); Modified: incubator/pig/trunk/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt URL: http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt?rev=645726&r1=645725&r2=645726&view=diff ============================================================================== --- incubator/pig/trunk/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt (original) +++ incubator/pig/trunk/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt Mon Apr 7 15:57:21 2008 @@ -251,59 +251,66 @@ private static final String PERL = "perl"; private static final String PYTHON = "python"; private void checkAutoShipSpecs(StreamingCommand command, String[] argv) { + // Candidate for auto-ship String arg0 = argv[0]; - // Check if command is perl or python ... + + // Check if command is perl or python ... if so use the first non-option + // and non-quoted string as the candidate if (arg0.equalsIgnoreCase(PERL) || arg0.equalsIgnoreCase(PYTHON)) { for (int i=1; i < argv.length; ++i) { if (!argv[i].startsWith("-") && !isQuotedString(argv[i])) { - command.addPathToShip(argv[i]); - command.setExecutable(argv[i]); + checkAndShip(command, argv[i]); break; } } } else { // Ship the first argument if it can be ... - boolean absPath = arg0.startsWith("/"); - String filePath = (absPath) ? arg0 : which(arg0); - if (filePath.length() > 0 && checkAndShip(command, filePath)) { - // Make it relative to task's cwd - String runtimeExecutablePath = (absPath) ? ("." + filePath) : filePath; - argv[0] = runtimeExecutablePath; - command.setCommandArgs(argv); - command.setExecutable(runtimeExecutablePath); - - // Ship the file - command.addPathToShip(filePath); - } + checkAndShip(command, arg0); } } + private void checkAndShip(StreamingCommand command, String arg) { + // Don't auto-ship if it is an absolute path... + if (arg.startsWith("/")) { + return; + } + + // $ which arg + String argPath = which(arg); + if (argPath != null && !inSkipPaths(argPath)) { + command.addPathToShip(argPath); + } + + } + private boolean isQuotedString(String s) { return (s.charAt(0) == '\'' && s.charAt(s.length()-1) == '\''); } - private boolean checkAndShip(StreamingCommand command, String file) { - // Check if file is in the paths to be skipped + // Check if file is in the list paths to be skipped + private boolean inSkipPaths(String file) { for (String skipPath : pigContext.getPathsToSkip()) { if (file.startsWith(skipPath)) { - return false; + return true; } } - return true; + return false; } private static String which(String file) { try { - ProcessBuilder processBuilder = new ProcessBuilder(new String[] {"which", file}); + ProcessBuilder processBuilder = + new ProcessBuilder(new String[] {"which", file}); Process process = processBuilder.start(); - BufferedReader stdout = new BufferedReader(new InputStreamReader(process.getInputStream())); + BufferedReader stdout = + new BufferedReader(new InputStreamReader(process.getInputStream())); String fullPath = stdout.readLine(); - return (process.waitFor() == 0) ? fullPath : ""; + return (process.waitFor() == 0) ? fullPath : null; } catch (Exception e) {} - return ""; + return null; } private static final char SINGLE_QUOTE = '\''; @@ -1505,6 +1512,10 @@ } LogicalPlan readFrom = aliases.get(t.image); + if (readFrom == null) { + throw new ParseException("Undefined alias: " + t.image + + " used in STORE"); + } String jobOutputFile = massageFilename(fileName, pigContext); lo = new LOStore(opTable, scope, getNextId(), readFrom.getRoot(), new FileSpec(jobOutputFile, functionSpec), Modified: incubator/pig/trunk/src/org/apache/pig/impl/streaming/ExecutableManager.java URL: http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/impl/streaming/ExecutableManager.java?rev=645726&r1=645725&r2=645726&view=diff ============================================================================== --- incubator/pig/trunk/src/org/apache/pig/impl/streaming/ExecutableManager.java (original) +++ incubator/pig/trunk/src/org/apache/pig/impl/streaming/ExecutableManager.java Mon Apr 7 15:57:21 2008 @@ -22,9 +22,13 @@ import java.io.BufferedReader; import java.io.DataInputStream; import java.io.DataOutputStream; +import java.io.File; import java.io.IOException; import java.io.InputStream; import java.io.InputStreamReader; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; import java.util.Properties; import org.apache.commons.logging.Log; @@ -51,7 +55,9 @@ private static final Log LOG = LogFactory.getLog(ExecutableManager.class.getName()); private static final int SUCCESS = 0; - + private static final String PATH = "PATH"; + private static final String BASH = "bash"; + protected StreamingCommand command; // Streaming command to be run String[] argv; // Parsed/split commands @@ -190,21 +196,66 @@ stderrThread.interrupt(); } } - - } + /** + * Set up the run-time environment of the managed process. + * + * @param pb [EMAIL PROTECTED] ProcessBuilder} used to exec the process + */ + protected void setupEnvironment(ProcessBuilder pb) { + String separator = ":"; + Map<String, String> env = pb.environment(); + + // Add the current-working-directory to the $PATH + File dir = pb.directory(); + String cwd = (dir != null) ? + dir.getAbsolutePath() : System.getProperty("user.dir"); + String envPath = env.get(PATH); + if (envPath == null) { + envPath = cwd; + } else { + envPath = envPath + separator + cwd; + } + env.put(PATH, envPath); + } + + /** + * Start execution of the external process. + * + * This takes care of setting up the environment of the process and also + * starts [EMAIL PROTECTED] ProcessErrorThread} to process the <code>stderr</code> of + * the managed process. + * + * @throws IOException + */ protected void exec() throws IOException { - // Unquote command-line arguments ... + // Set the actual command to run with 'bash -c exec ...' + List<String> cmdArgs = new ArrayList<String>(); + cmdArgs.add(BASH); + cmdArgs.add("-c"); + StringBuffer sb = new StringBuffer(); + sb.append("exec "); for (int i=0; i < argv.length; ++i) { + // Single-quote each component, however ensure that already + // quoted args are handled right + sb.append('\''); + String arg = argv[i]; if (arg.charAt(0) == '\'' && arg.charAt(arg.length()-1) == '\'') { - argv[i] = arg.substring(1, arg.length()-1); + arg = arg.substring(1, arg.length()-1); } + sb.append(arg); + + sb.append('\''); + sb.append(" "); } + cmdArgs.add(sb.toString()); // Start the external process - ProcessBuilder processBuilder = new ProcessBuilder(argv); + ProcessBuilder processBuilder = + new ProcessBuilder(cmdArgs.toArray(new String[cmdArgs.size()])); + setupEnvironment(processBuilder); process = processBuilder.start(); LOG.debug("Started the process for command: " + command); Modified: incubator/pig/trunk/src/org/apache/pig/impl/util/JarManager.java URL: http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/impl/util/JarManager.java?rev=645726&r1=645725&r2=645726&view=diff ============================================================================== --- incubator/pig/trunk/src/org/apache/pig/impl/util/JarManager.java (original) +++ incubator/pig/trunk/src/org/apache/pig/impl/util/JarManager.java Mon Apr 7 15:57:21 2008 @@ -131,8 +131,10 @@ new ObjectOutputStream(jarFile).writeObject(pigContext); } - for (String file : files) { - addStream(jarFile, file, new FileInputStream(file), contents); + if (files != null) { + for (String file : files) { + addStream(jarFile, file, new FileInputStream(file), contents); + } } jarFile.close(); Modified: incubator/pig/trunk/test/org/apache/pig/test/TestStreaming.java URL: http://svn.apache.org/viewvc/incubator/pig/trunk/test/org/apache/pig/test/TestStreaming.java?rev=645726&r1=645725&r2=645726&view=diff ============================================================================== --- incubator/pig/trunk/test/org/apache/pig/test/TestStreaming.java (original) +++ incubator/pig/trunk/test/org/apache/pig/test/TestStreaming.java Mon Apr 7 15:57:21 2008 @@ -270,7 +270,7 @@ // Pig query to run pigServer.registerQuery( - "define CMD `." + command + " foo` " + + "define CMD `" + command.getName() + " foo` " + "ship ('" + command + "') " + "input('foo' using " + PigStorage.class.getName() + "(',')) " + "stderr();"); @@ -320,7 +320,7 @@ "}", }; File command = Util.createInputFile("script", "pl", script); - + // Expected results String[] expectedFirstFields = new String[] {"A", "A", "A", "A", "A", "A"}; @@ -330,7 +330,7 @@ // Pig query to run pigServer.registerQuery( - "define CMD `." + command + " foo bar` " + + "define CMD `" + command.getName() + " foo bar` " + "ship ('" + command + "') " + "output('foo' using " + PigStorage.class.getName() + "(','), " + "'bar' using " + PigStorage.class.getName() + "(',')) " + @@ -358,7 +358,7 @@ } @Test - public void testInputOutputSpecsWithAutoShip() throws Exception { + public void testInputOutputSpecs() throws Exception { PigServer pigServer = new PigServer(MAPREDUCE); File input = Util.createInputFile("tmp", "", @@ -382,7 +382,7 @@ "}", }; File command = Util.createInputFile("script", "pl", script); - + // Expected results String[] expectedFirstFields = new String[] {"A", "B", "C", "A", "D", "A"}; @@ -392,7 +392,7 @@ // Pig query to run pigServer.registerQuery( - "define CMD `." + command + " foo bar foobar` " + + "define CMD `" + command.getName() + " foo bar foobar` " + "ship ('" + command + "') " + "input('foo' using " + PigStorage.class.getName() + "(',')) " + "output('bar' using " + PigStorage.class.getName() + "(','), " +