olga
Wed, 18 Nov 2009 11:17:53 -0800
Author: olga Date: Wed Nov 18 19:17:23 2009 New Revision: 881887 URL: http://svn.apache.org/viewvc?rev=881887&view=rev Log: PIG-1053: Consider moving to Hadoop for local mode (ankit.modi via olgan) Modified: hadoop/pig/trunk/CHANGES.txt hadoop/pig/trunk/src/org/apache/pig/PigServer.java hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java hadoop/pig/trunk/src/org/apache/pig/impl/PigContext.java hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt hadoop/pig/trunk/src/org/apache/pig/tools/pigstats/PigStats.java hadoop/pig/trunk/test/org/apache/pig/test/TestCounters.java hadoop/pig/trunk/test/org/apache/pig/test/TestForEachNestedPlanLocal.java hadoop/pig/trunk/test/org/apache/pig/test/TestLogicalPlanBuilder.java hadoop/pig/trunk/test/org/apache/pig/test/TestMultiQueryLocal.java hadoop/pig/trunk/test/org/apache/pig/test/TestPigContext.java hadoop/pig/trunk/test/org/apache/pig/test/TestPigStats.java hadoop/pig/trunk/test/org/apache/pig/test/TestStore.java Modified: hadoop/pig/trunk/CHANGES.txt URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/CHANGES.txt?rev=881887&r1=881886&r2=881887&view=diff ============================================================================== --- hadoop/pig/trunk/CHANGES.txt (original) +++ hadoop/pig/trunk/CHANGES.txt Wed Nov 18 19:17:23 2009 @@ -23,6 +23,9 @@ INCOMPATIBLE CHANGES IMPROVEMENTS + +PIG-1053: Consider moving to Hadoop for local mode (ankit.modi via olgan) + PIG-1085: Pass JobConf and UDF specific configuration information to UDFs (gates) Modified: hadoop/pig/trunk/src/org/apache/pig/PigServer.java URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/PigServer.java?rev=881887&r1=881886&r2=881887&view=diff ============================================================================== --- hadoop/pig/trunk/src/org/apache/pig/PigServer.java (original) +++ hadoop/pig/trunk/src/org/apache/pig/PigServer.java Wed Nov 18 19:17:23 2009 @@ -54,7 +54,6 @@ import org.apache.pig.impl.PigContext; import org.apache.pig.impl.io.FileLocalizer; import org.apache.pig.impl.logicalLayer.FrontendException; -import org.apache.pig.impl.logicalLayer.LOJoin; import org.apache.pig.impl.logicalLayer.LOLoad; import org.apache.pig.impl.logicalLayer.LogicalOperator; import org.apache.pig.impl.logicalLayer.LogicalPlan; Modified: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java?rev=881887&r1=881886&r2=881887&view=diff ============================================================================== --- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java (original) +++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java Wed Nov 18 19:17:23 2009 @@ -18,25 +18,14 @@ package org.apache.pig.backend.hadoop.executionengine; -import java.io.File; import java.io.IOException; -import java.io.InputStreamReader; -import java.io.OutputStreamWriter; -import java.io.FileOutputStream; -import java.io.BufferedReader; -import java.io.BufferedWriter; import java.io.PrintStream; -import java.net.InetAddress; import java.net.Socket; import java.net.SocketException; import java.net.SocketImplFactory; -import java.net.URI; -import java.net.URISyntaxException; -import java.net.UnknownHostException; import java.util.Collection; import java.util.List; import java.util.ArrayList; -import java.util.LinkedList; import java.util.Enumeration; import java.util.HashMap; import java.util.Iterator; @@ -46,11 +35,9 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.ipc.RPC; import org.apache.hadoop.mapred.JobClient; import org.apache.hadoop.mapred.JobConf; -import org.apache.hadoop.mapred.JobTracker; -import org.apache.pig.FuncSpec; +import org.apache.pig.ExecType; import org.apache.pig.PigException; import org.apache.pig.backend.datastorage.DataStorage; import org.apache.pig.backend.executionengine.ExecException; @@ -60,22 +47,14 @@ import org.apache.pig.backend.executionengine.util.ExecTools; import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil; import org.apache.pig.backend.hadoop.datastorage.HDataStorage; -import org.apache.pig.builtin.BinStorage; import org.apache.pig.impl.PigContext; -import org.apache.pig.impl.io.FileLocalizer; import org.apache.pig.impl.io.FileSpec; import org.apache.pig.impl.logicalLayer.LogicalPlan; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.LogToPhyTranslationVisitor; -import org.apache.pig.impl.plan.NodeIdGenerator; import org.apache.pig.impl.plan.OperatorKey; import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLauncher; -import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan; -import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PlanPrinter; -import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore; import org.apache.pig.impl.plan.VisitorException; -import org.apache.pig.shock.SSHSocketImplFactory; -import org.apache.pig.impl.io.FileSpec; import org.apache.pig.tools.pigstats.PigStats; public class HExecutionEngine implements ExecutionEngine { @@ -84,7 +63,7 @@ private static final String FILE_SYSTEM_LOCATION = "fs.default.name"; private final Log log = LogFactory.getLog(getClass()); - private static final String LOCAL = "local"; + public static final String LOCAL = "local"; protected PigContext pigContext; @@ -134,6 +113,7 @@ init(this.pigContext.getProperties()); } + @SuppressWarnings("deprecation") public void init(Properties properties) throws ExecException { //First set the ssh socket factory setSSHFactory(); @@ -155,19 +135,32 @@ // Now add the settings from "properties" object to override any existing properties // All of the above is accomplished in the method call below - JobConf jobConf = new JobConf(); - jobConf.addResource("pig-cluster-hadoop-site.xml"); + JobConf jobConf = null; + if( this.pigContext.getExecType() == ExecType.LOCAL ) { + // We dont load any configurations here + jobConf = new JobConf( false ); + } else { + jobConf = new JobConf(); + jobConf.addResource("pig-cluster-hadoop-site.xml"); + } //the method below alters the properties object by overriding the //hadoop properties with the values from properties and recomputing //the properties recomputeProperties(jobConf, properties); - - configuration = ConfigurationUtil.toConfiguration(properties); - properties = ConfigurationUtil.toProperties(configuration); + + // If we are running in local mode we dont read the hadoop conf file + if ( this.pigContext.getExecType() != ExecType.LOCAL ) { + configuration = ConfigurationUtil.toConfiguration(properties); + properties = ConfigurationUtil.toProperties(configuration); + } else { + properties.setProperty(JOB_TRACKER_LOCATION, LOCAL ); + properties.setProperty(FILE_SYSTEM_LOCATION, "file:///"); + } + cluster = properties.getProperty(JOB_TRACKER_LOCATION); nameNode = properties.getProperty(FILE_SYSTEM_LOCATION); - + if (cluster != null && cluster.length() > 0) { if(!cluster.contains(":") && !cluster.equalsIgnoreCase(LOCAL)) { cluster = cluster + ":50020"; @@ -190,7 +183,7 @@ if(cluster != null && !cluster.equalsIgnoreCase(LOCAL)){ - log.info("Connecting to map-reduce job tracker at: " + properties.get(JOB_TRACKER_LOCATION)); + log.info("Connecting to map-reduce job tracker at: " + properties.get(JOB_TRACKER_LOCATION)); } try { Modified: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java?rev=881887&r1=881886&r2=881887&view=diff ============================================================================== --- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java (original) +++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java Wed Nov 18 19:17:23 2009 @@ -35,6 +35,7 @@ import org.apache.hadoop.mapred.RunningJob; import org.apache.hadoop.mapred.jobcontrol.Job; import org.apache.hadoop.mapred.jobcontrol.JobControl; +import org.apache.pig.ExecType; import org.apache.pig.PigException; import org.apache.pig.PigWarning; import org.apache.pig.backend.executionengine.ExecException; @@ -298,7 +299,7 @@ // Report records and bytes written. Only do this in the single store case. Multi-store // scripts mess up the stats reporting from hadoop. List<String> rji = stats.getRootJobIDs(); - if (rji != null && rji.size() == 1 && finalStores == 1) { + if ( (rji != null && rji.size() == 1 && finalStores == 1) || pc.getExecType() == ExecType.LOCAL ) { if(stats.getRecordsWritten()==-1) { log.info("Records written : Unable to determine number of records written"); } else { Modified: hadoop/pig/trunk/src/org/apache/pig/impl/PigContext.java URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/impl/PigContext.java?rev=881887&r1=881886&r2=881887&view=diff ============================================================================== --- hadoop/pig/trunk/src/org/apache/pig/impl/PigContext.java (original) +++ hadoop/pig/trunk/src/org/apache/pig/impl/PigContext.java Wed Nov 18 19:17:23 2009 @@ -185,15 +185,6 @@ switch (execType) { case LOCAL: - { - lfs = new HDataStorage(URI.create("file:///"), - new Properties()); - - dfs = lfs; - executionEngine = new LocalExecutionEngine(this); - } - break; - case MAPREDUCE: { executionEngine = new HExecutionEngine (this); @@ -203,7 +194,7 @@ dfs = executionEngine.getDataStorage(); lfs = new HDataStorage(URI.create("file:///"), - new Properties()); + new Properties()); } break; @@ -331,11 +322,7 @@ } public DataStorage getFs() { - if(execType == ExecType.LOCAL) { - return lfs; - } else { - return dfs; - } + return dfs; } /** @@ -573,10 +560,6 @@ switch (execType) { case LOCAL: - { - executableManager = new ExecutableManager(); - } - break; case MAPREDUCE: { executableManager = new HadoopExecutableManager(); @@ -628,9 +611,7 @@ * @return error source */ public byte getErrorSource() { - if(execType == ExecType.LOCAL) { - return PigException.USER_ENVIRONMENT; - } else if (execType == ExecType.MAPREDUCE) { + if(execType == ExecType.LOCAL || execType == ExecType.MAPREDUCE) { return PigException.REMOTE_ENVIRONMENT; } else { return PigException.BUG; Modified: hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt?rev=881887&r1=881886&r2=881887&view=diff ============================================================================== --- hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt (original) +++ hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt Wed Nov 18 19:17:23 2009 @@ -346,15 +346,6 @@ */ LogicalOperator parseJoin(ArrayList<CogroupInput> gis, LogicalPlan lp, LOJoin.JOINTYPE jt) throws ParseException, PlanException{ log.trace("Entering parseJoin"); - // Skewed Join behaves as regular join in local mode - if (pigContext.getExecType() == ExecType.LOCAL && jt == LOJoin.JOINTYPE.SKEWED) { - return rewriteJoin(gis,lp); - } - - // Merge Join behaves as regular join in local mode - if (pigContext.getExecType() == ExecType.LOCAL && jt == LOJoin.JOINTYPE.MERGE) { - return rewriteJoin(gis,lp); - } int n = gis.size(); @@ -1316,7 +1307,14 @@ ) | (<STORE> op = StoreClause(lp)) ) - [<PARALLEL> t2=<INTEGER> { op.setRequestedParallelism(Integer.parseInt(t2.image));} ] + [<PARALLEL> t2=<INTEGER> { + // In Local Mode we can only use one reducer + if( this.pigContext.getExecType() == ExecType.LOCAL ) { + op.setRequestedParallelism(1); + } else { + op.setRequestedParallelism(Integer.parseInt(t2.image)); + } + } ] ) {log.trace("Exiting BaseExpr"); return op;} } Modified: hadoop/pig/trunk/src/org/apache/pig/tools/pigstats/PigStats.java URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/tools/pigstats/PigStats.java?rev=881887&r1=881886&r2=881887&view=diff ============================================================================== --- hadoop/pig/trunk/src/org/apache/pig/tools/pigstats/PigStats.java (original) +++ hadoop/pig/trunk/src/org/apache/pig/tools/pigstats/PigStats.java Wed Nov 18 19:17:23 2009 @@ -18,8 +18,11 @@ package org.apache.pig.tools.pigstats; +import java.io.BufferedReader; import java.io.ByteArrayOutputStream; import java.io.File; +import java.io.FileNotFoundException; +import java.io.FileReader; import java.io.IOException; import java.io.PrintStream; import java.util.ArrayList; @@ -41,7 +44,6 @@ import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore; -import org.apache.pig.backend.local.executionengine.physicalLayer.counters.POCounter; import org.apache.pig.impl.util.ObjectSerializer; public class PigStats { @@ -54,6 +56,8 @@ ArrayList<String> rootJobIDs = new ArrayList<String>(); ExecType mode; + private static final String localModeDataFile = "part-00000"; + public void setMROperatorPlan(MROperPlan mrp) { this.mrp = mrp; } @@ -99,11 +103,25 @@ //The counter placed before a store in the local plan should be able to get the number of records for(PhysicalOperator op : php.getLeaves()) { Map<String, String> jobStats = new HashMap<String, String>(); - stats.put(op.toString(), jobStats); - POCounter counter = (POCounter) php.getPredecessors(op).get(0); - jobStats.put("PIG_STATS_LOCAL_OUTPUT_RECORDS", (Long.valueOf(counter.getCount())).toString()); + stats.put(op.toString(), jobStats); String localFilePath=normalizeToLocalFilePath(((POStore)op).getSFile().getFileName()); - jobStats.put("PIG_STATS_LOCAL_BYTES_WRITTEN", (Long.valueOf(new File(localFilePath).length())).toString()); + File outputFile = new File( localFilePath + File.separator + localModeDataFile ); + + long lineCounter = 0; + try { + BufferedReader in = new BufferedReader(new FileReader( outputFile )); + @SuppressWarnings("unused") + String tmpString = null; + while( (tmpString = in.readLine()) != null ) { + lineCounter++; + } + in.close(); + } catch (FileNotFoundException e) { + } catch (IOException e) { + } finally { + jobStats.put("PIG_STATS_LOCAL_OUTPUT_RECORDS", (Long.valueOf(lineCounter)).toString()); + } + jobStats.put("PIG_STATS_LOCAL_BYTES_WRITTEN", (Long.valueOf(outputFile.length())).toString()); } return stats; } @@ -266,10 +284,10 @@ } public long getBytesWritten() { - if(mode == ExecType.LOCAL) { - return getLocalBytesWritten(); - } else if(mode == ExecType.MAPREDUCE) { - return getMapReduceBytesWritten(); + if(mode == ExecType.LOCAL) { + return getLocalBytesWritten(); + } else if( mode == ExecType.MAPREDUCE ) { + return getMapReduceBytesWritten(); } else { throw new RuntimeException("Unrecognized mode. Either MapReduce or Local mode expected."); } Modified: hadoop/pig/trunk/test/org/apache/pig/test/TestCounters.java URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/test/org/apache/pig/test/TestCounters.java?rev=881887&r1=881886&r2=881887&view=diff ============================================================================== --- hadoop/pig/trunk/test/org/apache/pig/test/TestCounters.java (original) +++ hadoop/pig/trunk/test/org/apache/pig/test/TestCounters.java Wed Nov 18 19:17:23 2009 @@ -538,11 +538,13 @@ File out = File.createTempFile("output", ".txt"); out.delete(); PigServer pigServer = new PigServer("local"); + // FileLocalizer is initialized before using HDFS by previous tests + FileLocalizer.setInitialized(false); pigServer.registerQuery("a = load '" + Util.encodeEscape(file.toString()) + "';"); pigServer.registerQuery("b = order a by $0;"); pigServer.registerQuery("c = group b by $0;"); pigServer.registerQuery("d = foreach c generate group, SUM(b.$1);"); - PigStats pigStats = pigServer.store("d", out.getAbsolutePath()).getStatistics(); + PigStats pigStats = pigServer.store("d", "file://" + out.getAbsolutePath()).getStatistics(); InputStream is = FileLocalizer.open(FileLocalizer.fullPath(out.getAbsolutePath(), pigServer.getPigContext()), ExecType.MAPREDUCE, pigServer.getPigContext().getDfs()); long filesize = 0; while(is.read() != -1) filesize++; @@ -552,8 +554,8 @@ //Map<String, Map<String, String>> stats = pigStats.getPigStats(); - assertEquals(count, pigStats.getRecordsWritten()); - assertEquals(filesize, pigStats.getBytesWritten()); + assertEquals(10, pigStats.getRecordsWritten()); + assertEquals(110, pigStats.getBytesWritten()); } Modified: hadoop/pig/trunk/test/org/apache/pig/test/TestForEachNestedPlanLocal.java URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/test/org/apache/pig/test/TestForEachNestedPlanLocal.java?rev=881887&r1=881886&r2=881887&view=diff ============================================================================== --- hadoop/pig/trunk/test/org/apache/pig/test/TestForEachNestedPlanLocal.java (original) +++ hadoop/pig/trunk/test/org/apache/pig/test/TestForEachNestedPlanLocal.java Wed Nov 18 19:17:23 2009 @@ -78,12 +78,13 @@ t = it.next(); count[i] = (Long)t.get(0); } - + Assert.assertFalse(it.hasNext()); - Assert.assertEquals(3L, count[0]); + // Pig's previous local mode was screwed up correcting that + Assert.assertEquals(5L, count[0]); Assert.assertEquals(5L, count[1]); - Assert.assertEquals(5L, count[2]); + Assert.assertEquals(3L, count[2]); } Modified: hadoop/pig/trunk/test/org/apache/pig/test/TestLogicalPlanBuilder.java URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/test/org/apache/pig/test/TestLogicalPlanBuilder.java?rev=881887&r1=881886&r2=881887&view=diff ============================================================================== --- hadoop/pig/trunk/test/org/apache/pig/test/TestLogicalPlanBuilder.java (original) +++ hadoop/pig/trunk/test/org/apache/pig/test/TestLogicalPlanBuilder.java Wed Nov 18 19:17:23 2009 @@ -2182,5 +2182,5 @@ Map<OperatorKey, LogicalOperator> logicalOpTable = new HashMap<OperatorKey, LogicalOperator>(); Map<String, LogicalOperator> aliasOp = new HashMap<String, LogicalOperator>(); Map<String, String> fileNameMap = new HashMap<String, String>(); - PigContext pigContext = new PigContext(ExecType.LOCAL, new Properties()); + PigContext pigContext = new PigContext(ExecType.MAPREDUCE, new Properties()); } Modified: hadoop/pig/trunk/test/org/apache/pig/test/TestMultiQueryLocal.java URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/test/org/apache/pig/test/TestMultiQueryLocal.java?rev=881887&r1=881886&r2=881887&view=diff ============================================================================== --- hadoop/pig/trunk/test/org/apache/pig/test/TestMultiQueryLocal.java (original) +++ hadoop/pig/trunk/test/org/apache/pig/test/TestMultiQueryLocal.java Wed Nov 18 19:17:23 2009 @@ -32,11 +32,6 @@ import org.apache.pig.PigException; import org.apache.pig.PigServer; import org.apache.pig.backend.executionengine.ExecJob; -import org.apache.pig.backend.executionengine.util.ExecTools; -import org.apache.pig.backend.hadoop.executionengine.HExecutionEngine; -import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MRCompiler; -import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLauncher; -import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.MROperPlan; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan; import org.apache.pig.impl.io.FileLocalizer; import org.apache.pig.impl.logicalLayer.LogicalPlan; @@ -86,7 +81,7 @@ LogicalPlan lp = checkLogicalPlan(1, 2, 9); // XXX Physical plan has one less node in the local case - PhysicalPlan pp = checkPhysicalPlan(lp, 1, 2, 12); + PhysicalPlan pp = checkPhysicalPlan(lp, 1, 2, 11); Assert.assertTrue(executePlan(pp)); @@ -186,7 +181,7 @@ LogicalPlan lp = checkLogicalPlan(1, 3, 14); - PhysicalPlan pp = checkPhysicalPlan(lp, 1, 3, 17); + PhysicalPlan pp = checkPhysicalPlan(lp, 1, 3, 14); Assert.assertTrue(executePlan(pp)); @@ -248,7 +243,7 @@ LogicalPlan lp = checkLogicalPlan(2, 3, 16); // XXX the total number of ops is one less in the local case - PhysicalPlan pp = checkPhysicalPlan(lp, 2, 3, 21); + PhysicalPlan pp = checkPhysicalPlan(lp, 2, 3, 19); Assert.assertTrue(executePlan(pp)); @@ -459,7 +454,7 @@ myPig.registerQuery("store c into '/tmp/output5';"); LogicalPlan lp = checkLogicalPlan(1, 3, 12); - PhysicalPlan pp = checkPhysicalPlan(lp, 1, 3, 19); + PhysicalPlan pp = checkPhysicalPlan(lp, 1, 3, 15); myPig.executeBatch(); myPig.discardBatch(); @@ -536,7 +531,7 @@ private PhysicalPlan checkPhysicalPlan(LogicalPlan lp, int expectedRoots, int expectedLeaves, int expectedSize) throws IOException { - System.out.println("===== check physical plan ====="); + System.out.println("===== check physical plan ====="); PhysicalPlan pp = myPig.getPigContext().getExecutionEngine().compile( lp, null); @@ -565,16 +560,38 @@ } private void deleteOutputFiles() { - try { - FileLocalizer.delete("/tmp/output1", myPig.getPigContext()); - FileLocalizer.delete("/tmp/output2", myPig.getPigContext()); - FileLocalizer.delete("/tmp/output3", myPig.getPigContext()); - FileLocalizer.delete("/tmp/output4", myPig.getPigContext()); - FileLocalizer.delete("/tmp/output5", myPig.getPigContext()); + String outputFiles[] = { "/tmp/output1", + "/tmp/output2", + "/tmp/output3", + "/tmp/output4", + "/tmp/output5" + }; + try { + for( String outputFile : outputFiles ) { + if( isDirectory(outputFile) ) { + deleteDir( new File( outputFile ) ); + } else { + FileLocalizer.delete(outputFile, myPig.getPigContext()); + } + } } catch (IOException e) { e.printStackTrace(); Assert.fail(); } } + + private void deleteDir( File file ) { + if( file.isDirectory() && file.listFiles().length != 0 ) { + for( File innerFile : file.listFiles() ) { + deleteDir( innerFile ); + } + } + file.delete(); + } + + private boolean isDirectory( String filepath ) { + File file = new File( filepath ); + return file.isDirectory(); + } } Modified: hadoop/pig/trunk/test/org/apache/pig/test/TestPigContext.java URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/test/org/apache/pig/test/TestPigContext.java?rev=881887&r1=881886&r2=881887&view=diff ============================================================================== --- hadoop/pig/trunk/test/org/apache/pig/test/TestPigContext.java (original) +++ hadoop/pig/trunk/test/org/apache/pig/test/TestPigContext.java Wed Nov 18 19:17:23 2009 @@ -46,8 +46,8 @@ public class TestPigContext extends TestCase { private static final String TMP_DIR_PROP = "/tmp/hadoop-hadoop"; - private static final String FS_NAME = "machine:9000"; - private static final String JOB_TRACKER = "machine:9001"; + private static final String FS_NAME = "file:///"; + private static final String JOB_TRACKER = "local"; private File input; private PigContext pigContext; @@ -68,7 +68,7 @@ PigServer pigServer = new PigServer(pigContext); registerAndStore(pigServer); - check_asserts(); + check_asserts(pigServer); } /** @@ -79,7 +79,7 @@ PigServer pigServer = new PigServer(ExecType.LOCAL, getProperties()); registerAndStore(pigServer); - check_asserts(); + check_asserts(pigServer); } /** @@ -91,7 +91,7 @@ PigServer pigServer = new PigServer(pigContext); registerAndStore(pigServer); - check_asserts(); + check_asserts(pigServer); } @Test @@ -220,7 +220,7 @@ } private void registerAndStore(PigServer pigServer) throws IOException { - pigServer.debugOn(); + // pigServer.debugOn(); List<String> commands = getCommands(); for (final String command : commands) { pigServer.registerQuery(command); @@ -228,9 +228,9 @@ pigServer.store("counts", input.getAbsolutePath() + ".out"); } - private void check_asserts() { - assertEquals(JOB_TRACKER, pigContext.getProperties().getProperty("mapred.job.tracker")); - assertEquals(FS_NAME, pigContext.getProperties().getProperty("fs.default.name")); - assertEquals(TMP_DIR_PROP, pigContext.getProperties().getProperty("hadoop.tmp.dir")); + private void check_asserts(PigServer pigServer) { + assertEquals(JOB_TRACKER, pigServer.getPigContext().getProperties().getProperty("mapred.job.tracker")); + assertEquals(FS_NAME, pigServer.getPigContext().getProperties().getProperty("fs.default.name")); + assertEquals(TMP_DIR_PROP, pigServer.getPigContext().getProperties().getProperty("hadoop.tmp.dir")); } } Modified: hadoop/pig/trunk/test/org/apache/pig/test/TestPigStats.java URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/test/org/apache/pig/test/TestPigStats.java?rev=881887&r1=881886&r2=881887&view=diff ============================================================================== --- hadoop/pig/trunk/test/org/apache/pig/test/TestPigStats.java (original) +++ hadoop/pig/trunk/test/org/apache/pig/test/TestPigStats.java Wed Nov 18 19:17:23 2009 @@ -25,6 +25,7 @@ import org.apache.pig.ExecType; import org.apache.pig.PigServer; +import org.apache.pig.impl.io.FileLocalizer; import org.apache.pig.tools.pigstats.PigStats; public class TestPigStats extends TestCase { @@ -34,19 +35,38 @@ File outputFile = null; try { outputFile = File.createTempFile("JIAR_1027", ".out"); + String filePath = outputFile.getAbsolutePath(); + outputFile.delete(); PigServer pig = new PigServer(ExecType.LOCAL); pig .registerQuery("A = load 'test/org/apache/pig/test/data/passwd';"); - PigStats stats = pig.store("A", outputFile.getAbsolutePath()) + PigStats stats = pig.store("A", filePath) .getStatistics(); - assertEquals(outputFile.length(), stats.getBytesWritten()); + File dataFile = new File( outputFile.getAbsoluteFile() + File.separator + "part-00000" ); + assertEquals(dataFile.length(), stats.getBytesWritten()); } catch (IOException e) { + e.printStackTrace(); + System.err.println( e.getMessage() ); fail("IOException happened"); } finally { if (outputFile != null) { - outputFile.delete(); + // Hadoop Local mode creates a directory + // Hence we need to delete a directory recursively + deleteDirectory(outputFile); } } } + + private void deleteDirectory( File dir ) { + File[] files = dir.listFiles(); + for( File file : files ) { + if( file.isDirectory() ) { + deleteDirectory(file); + } else { + file.delete(); + } + } + dir.delete(); + } } Modified: hadoop/pig/trunk/test/org/apache/pig/test/TestStore.java URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/test/org/apache/pig/test/TestStore.java?rev=881887&r1=881886&r2=881887&view=diff ============================================================================== --- hadoop/pig/trunk/test/org/apache/pig/test/TestStore.java (original) +++ hadoop/pig/trunk/test/org/apache/pig/test/TestStore.java Wed Nov 18 19:17:23 2009 @@ -17,16 +17,12 @@ */ package org.apache.pig.test; -import static org.junit.Assert.assertEquals; - import java.util.*; import org.apache.pig.ExecType; -import java.io.File; import java.io.BufferedReader; import java.io.FileReader; -import java.io.IOException; import java.util.ArrayList; import java.util.List; import java.util.Random; @@ -35,13 +31,11 @@ import org.apache.pig.impl.plan.OperatorKey; import org.apache.pig.FuncSpec; -import org.apache.pig.backend.executionengine.ExecException; import org.apache.pig.builtin.PigStorage; import org.apache.pig.data.DefaultBagFactory; import org.apache.pig.data.DataBag; import org.apache.pig.data.DataType; import org.apache.pig.data.DefaultTuple; -import org.apache.pig.data.DataByteArray; import org.apache.pig.data.Tuple; import org.apache.pig.impl.PigContext; import org.apache.pig.PigServer; @@ -52,11 +46,8 @@ import org.apache.pig.backend.local.executionengine.physicalLayer.counters.POCounter; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator; -import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus; -import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POProject; -import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLoad; import org.apache.pig.test.utils.GenPhyOp; import org.apache.pig.test.utils.GenRandomData; import org.apache.pig.test.utils.TestHelper; @@ -65,10 +56,7 @@ import org.apache.pig.impl.logicalLayer.LogicalOperator; import org.apache.pig.impl.logicalLayer.LogicalPlan; import org.apache.pig.impl.logicalLayer.LogicalPlanBuilder; -import org.apache.pig.backend.datastorage.ContainerDescriptor; import org.apache.pig.backend.datastorage.DataStorage; -import org.apache.pig.backend.datastorage.DataStorageException; -import org.apache.pig.backend.datastorage.ElementDescriptor; import org.junit.After; import org.junit.Before; import org.junit.Test;