pradeepkth
Thu, 19 Nov 2009 09:16:54 -0800
Author: pradeepkth Date: Thu Nov 19 17:16:21 2009 New Revision: 882208 URL: http://svn.apache.org/viewvc?rev=882208&view=rev Log: Fixes for a couple of more unit tests Modified: hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigOutputCommitter.java hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/io/PigFile.java hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestEvalPipeline.java hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestPigContext.java Modified: hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigOutputCommitter.java URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigOutputCommitter.java?rev=882208&r1=882207&r2=882208&view=diff ============================================================================== --- hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigOutputCommitter.java (original) +++ hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigOutputCommitter.java Thu Nov 19 17:16:21 2009 @@ -28,6 +28,7 @@ import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.pig.StoreFunc; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore; +import org.apache.pig.impl.PigContext; import org.apache.pig.impl.util.ObjectSerializer; /** @@ -78,9 +79,19 @@ * @return * @throws IOException */ + @SuppressWarnings("unchecked") private List<OutputCommitter> getCommitters(TaskAttemptContext context, String storeLookupKey) throws IOException { Configuration conf = context.getConfiguration(); + + // if there is a udf in the plan we would need to know the import + // path so we can instantiate the udf. This is required because + // we will be deserializing the POStores out of the plan in the next + // line below. The POStore inturn has a member reference to the Physical + // plan it is part of - so the deserialization goes deep and while + // deserializing the plan, the udf.import.list may be needed. + PigContext.setPackageImportList((ArrayList<String>)ObjectSerializer. + deserialize(conf.get("udf.import.list"))); LinkedList<POStore> stores = (LinkedList<POStore>) ObjectSerializer. deserialize(conf.get(storeLookupKey)); List<OutputCommitter> committers = new ArrayList<OutputCommitter>(); Modified: hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/io/PigFile.java URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/io/PigFile.java?rev=882208&r1=882207&r2=882208&view=diff ============================================================================== --- hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/io/PigFile.java (original) +++ hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/io/PigFile.java Thu Nov 19 17:16:21 2009 @@ -17,21 +17,27 @@ */ package org.apache.pig.impl.io; -import java.io.BufferedOutputStream; import java.io.IOException; -import java.io.InputStream; import java.util.Iterator; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.mapreduce.JobContext; +import org.apache.hadoop.mapreduce.JobID; +import org.apache.hadoop.mapreduce.OutputCommitter; +import org.apache.hadoop.mapreduce.OutputFormat; +import org.apache.hadoop.mapreduce.RecordWriter; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hadoop.mapreduce.TaskAttemptID; import org.apache.pig.LoadFunc; import org.apache.pig.StoreFunc; +import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil; +import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigOutputFormat; import org.apache.pig.data.BagFactory; import org.apache.pig.data.DataBag; import org.apache.pig.data.Tuple; import org.apache.pig.impl.PigContext; -import org.apache.pig.impl.io.FileLocalizer; -// XXX: FIXME: make this work with load store redesign public class PigFile { private String file = null; @@ -48,11 +54,10 @@ public DataBag load(LoadFunc lfunc, PigContext pigContext) throws IOException { DataBag content = BagFactory.getInstance().newDefaultBag(); - InputStream is = FileLocalizer.open(file, pigContext); - //XXX FIXME: make this work with new load-store redesign -// lfunc.bindTo(file, new BufferedPositionedInputStream(is), 0, Long.MAX_VALUE); + ReadToEndLoader loader = new ReadToEndLoader(lfunc, + ConfigurationUtil.toConfiguration(pigContext.getProperties()), file, 0); Tuple f = null; - while ((f = lfunc.getNext()) != null) { + while ((f = loader.getNext()) != null) { content.add(f); } return content; @@ -60,14 +65,36 @@ public void store(DataBag data, StoreFunc sfunc, PigContext pigContext) throws IOException { - BufferedOutputStream bos = new BufferedOutputStream(FileLocalizer.create(file, append, pigContext)); -// sfunc.bindTo(bos); - for (Iterator<Tuple> it = data.iterator(); it.hasNext();) { - Tuple row = it.next(); - sfunc.putNext(row); + Configuration conf = ConfigurationUtil.toConfiguration(pigContext.getProperties()); + // create a simulated JobContext + JobContext jc = new JobContext(conf, new JobID()); + OutputFormat<?,?> of = sfunc.getOutputFormat(); + PigOutputFormat.setLocation(jc, sfunc, file); + OutputCommitter oc; + // create a simulated TaskAttemptContext + TaskAttemptContext tac = new TaskAttemptContext(conf, new TaskAttemptID()); + PigOutputFormat.setLocation(tac, sfunc, file); + RecordWriter<?,?> rw ; + try { + of.checkOutputSpecs(jc); + oc = of.getOutputCommitter(tac); + oc.setupJob(jc); + oc.setupTask(tac); + rw = of.getRecordWriter(tac); + sfunc.prepareToWrite(rw); + + for (Iterator<Tuple> it = data.iterator(); it.hasNext();) { + Tuple row = it.next(); + sfunc.putNext(row); + } + rw.close(tac); + } catch (InterruptedException e) { + throw new IOException(e); } -// sfunc.finish(); - bos.close(); + if(oc.needsTaskCommit(tac)) { + oc.commitTask(tac); + } + oc.cleanupJob(jc); } public String toString() { Modified: hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestEvalPipeline.java URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestEvalPipeline.java?rev=882208&r1=882207&r2=882208&view=diff ============================================================================== --- hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestEvalPipeline.java (original) +++ hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestEvalPipeline.java Thu Nov 19 17:16:21 2009 @@ -160,18 +160,21 @@ t.append(weights); b.add(t); - String fileName = "file:"+File.createTempFile("tmp", ""); + File tmpFile = File.createTempFile("tmp", ""); + tmpFile.deleteOnExit(); + String fileName = tmpFile.getAbsolutePath(); PigFile f = new PigFile(fileName); f.store(b, new BinStorage(), pigServer.getPigContext()); - pigServer.registerQuery("a = load '" + Util.encodeEscape(fileName) + "' using BinStorage();"); + pigServer.registerQuery("a = load '" + fileName + "' using BinStorage();"); pigServer.registerQuery("b = foreach a generate $0#'apple',flatten($1#'orange');"); Iterator<Tuple> iter = pigServer.openIterator("b"); t = iter.next(); assertEquals(t.get(0).toString(), "red"); assertEquals(DataType.toDouble(t.get(1)), 0.3); assertFalse(iter.hasNext()); + Util.deleteFile(cluster, fileName); } static public class TitleNGrams extends EvalFunc<DataBag> { Modified: hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestPigContext.java URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestPigContext.java?rev=882208&r1=882207&r2=882208&view=diff ============================================================================== --- hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestPigContext.java (original) +++ hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestPigContext.java Thu Nov 19 17:16:21 2009 @@ -109,8 +109,6 @@ tmpDir.delete(); tmpDir.mkdir(); - File tempDir = new File(tmpDir.getAbsolutePath()); - Util.deleteDirectory(tempDir); File udf1Dir = new File(tmpDir.getAbsolutePath()+FILE_SEPARATOR+"com"+FILE_SEPARATOR+"xxx"+FILE_SEPARATOR+"udf1"); udf1Dir.mkdirs(); File udf2Dir = new File(tmpDir.getAbsolutePath()+FILE_SEPARATOR+"com"+FILE_SEPARATOR+"xxx"+FILE_SEPARATOR+"udf2"); @@ -172,18 +170,18 @@ int LOOP_COUNT = 40; File tmpFile = File.createTempFile("test", "txt"); - PrintStream ps = new PrintStream(new FileOutputStream(tmpFile)); + tmpFile.deleteOnExit(); + String input[] = new String[LOOP_COUNT]; Random r = new Random(1); int rand; for(int i = 0; i < LOOP_COUNT; i++) { rand = r.nextInt(100); - ps.println(rand); + input[i] = Integer.toString(rand); } - ps.close(); - + Util.createInputFile(cluster, tmpFile.getCanonicalPath(), input); FileLocalizer.deleteTempFiles(); PigServer pigServer = new PigServer(ExecType.MAPREDUCE, cluster.getProperties()); - pigServer.registerQuery("A = LOAD '" + Util.generateURI(tmpFile.toString()) + "' using TestUDF2() AS (num:chararray);"); + pigServer.registerQuery("A = LOAD '" + tmpFile.getCanonicalPath() + "' using TestUDF2() AS (num:chararray);"); pigServer.registerQuery("B = foreach A generate TestUDF1(num);"); Iterator<Tuple> iter = pigServer.openIterator("B"); if(!iter.hasNext()) fail("No output found"); @@ -192,8 +190,8 @@ assertTrue(t.get(0) instanceof Integer); assertTrue((Integer)t.get(0) == 1); } - - Util.deleteDirectory(tempDir); + Util.deleteFile(cluster, tmpFile.getCanonicalPath()); + Util.deleteDirectory(tmpDir); } @After