olga
Thu, 06 Mar 2008 17:10:12 -0800
Author: olga Date: Thu Mar 6 17:09:44 2008 New Revision: 634492 URL: http://svn.apache.org/viewvc?rev=634492&view=rev Log: PIG-129: making sure that temp files are stored in task's home dir and cleaned up Modified: incubator/pig/trunk/CHANGES.txt incubator/pig/trunk/src/org/apache/pig/data/DataBag.java Modified: incubator/pig/trunk/CHANGES.txt URL: http://svn.apache.org/viewvc/incubator/pig/trunk/CHANGES.txt?rev=634492&r1=634491&r2=634492&view=diff ============================================================================== --- incubator/pig/trunk/CHANGES.txt (original) +++ incubator/pig/trunk/CHANGES.txt Thu Mar 6 17:09:44 2008 @@ -160,3 +160,6 @@ PIG-94: M1 for streaming: maps and reduce side support with default (de)serializer (acmurthy via olgan) + + PIG-129: making sure that temp files are stored in task's home dir and + cleaned up Modified: incubator/pig/trunk/src/org/apache/pig/data/DataBag.java URL: http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/data/DataBag.java?rev=634492&r1=634491&r2=634492&view=diff ============================================================================== --- incubator/pig/trunk/src/org/apache/pig/data/DataBag.java (original) +++ incubator/pig/trunk/src/org/apache/pig/data/DataBag.java Thu Mar 6 17:09:44 2008 @@ -30,7 +30,8 @@ import org.apache.pig.impl.util.Spillable; import org.apache.pig.backend.hadoop.executionengine.mapreduceExec.PigMapReduce; - +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; /** * A collection of Tuples. A DataBag may or may not fit into memory. @@ -67,6 +68,9 @@ * must be chosen up front, there is no way to convert a bag on the fly. */ public abstract class DataBag extends Datum implements Spillable, Iterable<Tuple> { + + private final Log log = LogFactory.getLog(getClass()); + // Container that holds the tuples. Actual object instantiated by // subclasses. protected Collection<Tuple> mContents; @@ -364,6 +368,32 @@ mSpillFiles = new ArrayList<File>(1); } + String tmpDirName= System.getProperties().getProperty("java.io.tmpdir") ; + File tmpDir = new File(tmpDirName); + + // if the directory does not exist, create it. + if (!tmpDir.exists()){ + log.info("Temporary directory doesn't exists. Trying to create: " + tmpDir.getAbsolutePath()); + // Create the directory and see if it was successful + if (tmpDir.mkdir()){ + log.info("Successfully created temporary directory: " + tmpDir.getAbsolutePath()); + } else { + // If execution reaches here, it means that we needed to create the directory but + // were not successful in doing so. + // + // If this directory is created recently then we can simply + // skip creation. This is to address a rare issue occuring in a cluster despite the + // the fact that spill() makes call to getSpillFile() in a synchronized + // block. + if (tmpDir.exists()) { + log.info("Temporary directory already exists: " + tmpDir.getAbsolutePath()); + } else { + log.error("Unable to create temporary directory: " + tmpDir.getAbsolutePath()); + throw new IOException("Unable to create temporary directory: " + tmpDir.getAbsolutePath() ); + } + } + } + File f = File.createTempFile("pigbag", null); f.deleteOnExit(); mSpillFiles.add(f);