gates
Tue, 13 May 2008 14:11:55 -0700
Modified: incubator/pig/branches/types/test/org/apache/pig/test/utils/GenPhyOp.java URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/test/org/apache/pig/test/utils/GenPhyOp.java?rev=656011&r1=656010&r2=656011&view=diff ============================================================================== --- incubator/pig/branches/types/test/org/apache/pig/test/utils/GenPhyOp.java (original) +++ incubator/pig/branches/types/test/org/apache/pig/test/utils/GenPhyOp.java Tue May 13 14:11:21 2008 @@ -17,33 +17,42 @@ */ package org.apache.pig.test.utils; +import java.io.IOException; import java.util.LinkedList; import java.util.List; import java.util.Random; import org.apache.pig.backend.executionengine.ExecException; +import org.apache.pig.builtin.BinStorage; import org.apache.pig.data.DataType; import org.apache.pig.data.Tuple; +import org.apache.pig.impl.PigContext; +import org.apache.pig.impl.io.FileLocalizer; +import org.apache.pig.impl.io.FileSpec; import org.apache.pig.impl.logicalLayer.OperatorKey; +import org.apache.pig.impl.mapReduceLayer.MapReduceOper; import org.apache.pig.impl.physicalLayer.plans.ExprPlan; import org.apache.pig.impl.physicalLayer.plans.PhysicalPlan; import org.apache.pig.impl.physicalLayer.topLevelOperators.POFilter; import org.apache.pig.impl.physicalLayer.topLevelOperators.POForEach; import org.apache.pig.impl.physicalLayer.topLevelOperators.POGenerate; -import org.apache.pig.impl.physicalLayer.topLevelOperators.PhysicalOperator; -// import org.apache.pig.impl.physicalLayer.topLevelOperators.POGlobalRearrange; +import org.apache.pig.impl.physicalLayer.topLevelOperators.POGlobalRearrange; import org.apache.pig.impl.physicalLayer.topLevelOperators.POLoad; import org.apache.pig.impl.physicalLayer.topLevelOperators.POLocalRearrange; -// import org.apache.pig.impl.physicalLayer.topLevelOperators.POPackage; +import org.apache.pig.impl.physicalLayer.topLevelOperators.POPackage; +import org.apache.pig.impl.physicalLayer.topLevelOperators.POSplit; import org.apache.pig.impl.physicalLayer.topLevelOperators.POStore; -// import org.apache.pig.impl.physicalLayer.topLevelOperators.StartMap; +import org.apache.pig.impl.physicalLayer.topLevelOperators.POUnion; +import org.apache.pig.impl.physicalLayer.topLevelOperators.PhysicalOperator; import org.apache.pig.impl.physicalLayer.topLevelOperators.expressionOperators.ConstantExpression; -import org.apache.pig.impl.physicalLayer.topLevelOperators.expressionOperators.ExpressionOperator; +import org.apache.pig.impl.physicalLayer.topLevelOperators.expressionOperators.POCast; import org.apache.pig.impl.physicalLayer.topLevelOperators.expressionOperators.POProject; -// import -// org.apache.pig.impl.physicalLayer.topLevelOperators.expressionOperators.binaryExprOps.comparators.EqualToExpr; -// import -// org.apache.pig.impl.physicalLayer.topLevelOperators.expressionOperators.binaryExprOps.comparators.GTOrEqualToExpr; +import org.apache.pig.impl.physicalLayer.topLevelOperators.expressionOperators.binaryExprOps.arithmeticOperators.Add; +import org.apache.pig.impl.physicalLayer.topLevelOperators.expressionOperators.binaryExprOps.arithmeticOperators.Divide; +import org.apache.pig.impl.physicalLayer.topLevelOperators.expressionOperators.binaryExprOps.arithmeticOperators.Mod; +import org.apache.pig.impl.physicalLayer.topLevelOperators.expressionOperators.binaryExprOps.arithmeticOperators.Multiply; +import org.apache.pig.impl.physicalLayer.topLevelOperators.expressionOperators.binaryExprOps.arithmeticOperators.Subtract; +import org.apache.pig.impl.physicalLayer.topLevelOperators.expressionOperators.binaryExprOps.comparators.ComparisonOperator; import org.apache.pig.impl.physicalLayer.topLevelOperators.expressionOperators.binaryExprOps.comparators.EqualToExpr; import org.apache.pig.impl.physicalLayer.topLevelOperators.expressionOperators.binaryExprOps.comparators.GTOrEqualToExpr; import org.apache.pig.impl.physicalLayer.topLevelOperators.expressionOperators.binaryExprOps.comparators.GreaterThanExpr; @@ -52,9 +61,17 @@ import org.apache.pig.impl.physicalLayer.topLevelOperators.expressionOperators.binaryExprOps.comparators.NotEqualToExpr; import org.apache.pig.impl.plan.PlanException; -public class GenPhyOp { +public class GenPhyOp{ static Random r = new Random(); - + + public static final byte GTE = 1; + public static final byte GT = 2; + public static final byte LTE = 3; + public static final byte LT = 4; + + public static PigContext pc; +// public static MiniCluster cluster = MiniCluster.buildCluster(); + public static ConstantExpression exprConst() { ConstantExpression ret = new ConstantExpression(new OperatorKey("", r .nextLong())); @@ -106,6 +123,17 @@ return ret; } + public static POGlobalRearrange topGlobalRearrangeOp(){ + POGlobalRearrange ret = new POGlobalRearrange(new OperatorKey("", r + .nextLong())); + return ret; + } + + public static POPackage topPackageOp(){ + POPackage ret = new POPackage(new OperatorKey("", r.nextLong())); + return ret; + } + public static POForEach topForEachOp() { POForEach ret = new POForEach(new OperatorKey("", r .nextLong())); @@ -117,6 +145,11 @@ return ret; } + public static POUnion topUnionOp() { + POUnion ret = new POUnion(new OperatorKey("", r.nextLong())); + return ret; + } + /** * creates the POGenerate operator for * generate grpCol, *. @@ -168,6 +201,65 @@ /** * creates the POGenerate operator for + * generate grpCol, *. + * + * @param grpCol - The column to be grouped on + * @param sample - The sample tuple that is used to infer + * result types and #projects for * + * @return - The POGenerate operator which has the exprplan + * for generate grpCol, * set. + * @throws ExecException + * @throws PlanException + */ + public static POGenerate topGenerateOpWithExPlanLR(int grpCol, Tuple sample) throws ExecException, PlanException { + POProject prj1 = new POProject(new OperatorKey("", r.nextLong()), -1, grpCol); + prj1.setResultType(sample.getType(grpCol)); + prj1.setOverloaded(false); + + POCast cst = new POCast(new OperatorKey("",r.nextLong())); + cst.setResultType(sample.getType(grpCol)); + + List<Boolean> toBeFlattened = new LinkedList<Boolean>(); + toBeFlattened.add(false); + + + ExprPlan plan1 = new ExprPlan(); + plan1.add(prj1); + plan1.add(cst); + plan1.connect(prj1, cst); + + List<ExprPlan> inputs = new LinkedList<ExprPlan>(); + inputs.add(plan1); + + POProject rest[] = new POProject[sample.size()]; + POCast csts[] = new POCast[sample.size()]; + int i=-1; + for (POProject project : rest) { + project = new POProject(new OperatorKey("", r.nextLong()), -1, ++i); + project.setResultType(sample.getType(i)); + project.setOverloaded(false); + + csts[i] = new POCast(new OperatorKey("",r.nextLong())); + csts[i].setResultType(sample.getType(i)); + + ExprPlan pl = new ExprPlan(); + pl.add(project); + pl.add(csts[i]); + pl.connect(project, csts[i]); + + toBeFlattened.add(false); + inputs.add(pl); + } + + + + POGenerate ret = new POGenerate(new OperatorKey("", r.nextLong()), + inputs, toBeFlattened); + return ret; + } + + /** + * creates the POGenerate operator for * 'generate field'. * * @param field - The column to be generated @@ -200,6 +292,129 @@ } /** + * creates the POGenerate operator for + * 'generate flatten(field)'. + * + * @param field - The column to be generated + * @param sample - The sample tuple that is used to infer + * result type + * @return - The POGenerate operator which has the exprplan + * for 'generate field' set. + * @throws ExecException + */ + public static POGenerate topGenerateOpWithExPlanForFeFlat(int field) throws ExecException { + POProject prj1 = new POProject(new OperatorKey("", r.nextLong()), -1, field); + prj1.setResultType(DataType.BAG); + prj1.setOverloaded(false); + + List<Boolean> toBeFlattened = new LinkedList<Boolean>(); + toBeFlattened.add(true); + + ExprPlan plan1 = new ExprPlan(); + plan1.add(prj1); + + List<ExprPlan> inputs = new LinkedList<ExprPlan>(); + inputs.add(plan1); + + POGenerate ret = new POGenerate(new OperatorKey("", r.nextLong()), + inputs, toBeFlattened); + return ret; + } + + /** + * creates the POGenerate operator for + * 'generate field[0] field[1] ...'. + * + * @param field - The columns to be generated + * @param sample - The sample tuple that is used to infer + * result type + * @return - The POGenerate operator which has the exprplan + * for 'generate field[0] field[1]' set. + * @throws ExecException + * @throws PlanException + */ + public static POGenerate topGenerateOpWithExPlanForFe(int[] fields, Tuple sample) throws ExecException, PlanException { + POProject[] prj = new POProject[fields.length]; + + for(int i=0;i<prj.length;i++){ + prj[i] = new POProject(new OperatorKey("", r.nextLong()), -1, fields[i]); + prj[i].setResultType(sample.getType(fields[i])); + prj[i].setOverloaded(false); + } + + + POCast[] cst = new POCast[fields.length]; + + List<Boolean> toBeFlattened = new LinkedList<Boolean>(); + for (POProject project : prj) + toBeFlattened.add(false); + + + + List<ExprPlan> inputs = new LinkedList<ExprPlan>(); + ExprPlan[] plans = new ExprPlan[fields.length]; + for (int i=0;i<plans.length;i++) { + plans[i] = new ExprPlan(); + plans[i].add(prj[i]); + cst[i] = new POCast(new OperatorKey("",r.nextLong())); + cst[i].setResultType(sample.getType(fields[i])); + plans[i].add(cst[i]); + plans[i].connect(prj[i], cst[i]); + inputs.add(plans[i]); + } + + POGenerate ret = new POGenerate(new OperatorKey("", r.nextLong()), + inputs, toBeFlattened); + return ret; + } + + /** + * creates the POGenerate operator for + * 'generate field[0] field[1] ...'. + * with the flatten list as specified + * @param field - The columns to be generated + * @param toBeFlattened - The columns to be flattened + * @param sample - The sample tuple that is used to infer + * result type + * @return - The POGenerate operator which has the exprplan + * for 'generate field[0] field[1]' set. + * @throws ExecException + * @throws PlanException + */ + public static POGenerate topGenerateOpWithExPlanForFe(int[] fields, Tuple sample, List<Boolean> toBeFlattened) throws ExecException, PlanException { + POProject[] prj = new POProject[fields.length]; + + for(int i=0;i<prj.length;i++){ + prj[i] = new POProject(new OperatorKey("", r.nextLong()), -1, fields[i]); + prj[i].setResultType(sample.getType(fields[i])); + prj[i].setOverloaded(false); + } + + + POCast[] cst = new POCast[fields.length]; + + /*List<Boolean> toBeFlattened = new LinkedList<Boolean>(); + for (POProject project : prj) + toBeFlattened.add(false);*/ + + List<ExprPlan> inputs = new LinkedList<ExprPlan>(); + ExprPlan[] plans = new ExprPlan[fields.length]; + for (int i=0;i<plans.length;i++) { + plans[i] = new ExprPlan(); + plans[i].add(prj[i]); + cst[i] = new POCast(new OperatorKey("",r.nextLong())); + cst[i].setResultType(sample.getType(fields[i])); + plans[i].add(cst[i]); + plans[i].connect(prj[i], cst[i]); + inputs.add(plans[i]); + } + + POGenerate ret = new POGenerate(new OperatorKey("", r.nextLong()), + inputs, toBeFlattened); + return ret; + } + + /** * creates the POLocalRearrange operator with the given index for * group by grpCol * @param index - The input index of this POLocalRearrange operator @@ -208,7 +423,28 @@ * @return - The POLocalRearrange operator * @throws ExecException */ - public static POLocalRearrange topLocalRearrangeOPWithPlan(int index, int grpCol, Tuple sample) throws ExecException{ + public static POLocalRearrange topLocalRearrangeOPWithPlan(int index, int grpCol, Tuple sample) throws ExecException, PlanException{ + POGenerate gen = topGenerateOpWithExPlanLR(grpCol, sample); + PhysicalPlan<PhysicalOperator> pp = new PhysicalPlan<PhysicalOperator>(); + pp.add(gen); + + POLocalRearrange ret = topLocalRearrangeOp(); + ret.setPlan(pp); + ret.setIndex(index); + ret.setResultType(DataType.TUPLE); + return ret; + } + + /** + * creates the POLocalRearrange operator with the given index for + * group by grpCol + * @param index - The input index of this POLocalRearrange operator + * @param grpCol - The column to be grouped on + * @param sample - Sample tuple needed for topGenerateOpWithExPlan + * @return - The POLocalRearrange operator + * @throws ExecException + */ + public static POLocalRearrange topLocalRearrangeOPWithPlanPlain(int index, int grpCol, Tuple sample) throws ExecException, PlanException{ POGenerate gen = topGenerateOpWithExPlan(grpCol, sample); PhysicalPlan<PhysicalOperator> pp = new PhysicalPlan<PhysicalOperator>(); pp.add(gen); @@ -228,7 +464,7 @@ * @return - The POForEach operator * @throws ExecException */ - public static POForEach topForEachOPWithPlan(int field, Tuple sample) throws ExecException{ + public static POForEach topForEachOPWithPlan(int field, Tuple sample) throws ExecException, PlanException{ POGenerate gen = topGenerateOpWithExPlanForFe(field, sample); PhysicalPlan<PhysicalOperator> pp = new PhysicalPlan<PhysicalOperator>(); pp.add(gen); @@ -239,8 +475,67 @@ return ret; } + /** + * creates the POForEach operator for + * foreach A generate field[0] field[1] + * @param fields - The columns to be generated + * @param sample - Sample tuple needed for topGenerateOpWithExPlanForFe + * @return - The POForEach operator + * @throws ExecException + */ + public static POForEach topForEachOPWithPlan(int[] fields, Tuple sample) throws ExecException, PlanException{ + POGenerate gen = topGenerateOpWithExPlanForFe(fields, sample); + PhysicalPlan<PhysicalOperator> pp = new PhysicalPlan<PhysicalOperator>(); + pp.add(gen); + + POForEach ret = topForEachOp(); + ret.setPlan(pp); + ret.setResultType(DataType.TUPLE); + return ret; + } + + /** + * creates the POForEach operator for + * foreach A generate field[0] field[1] + * @param fields - The columns to be generated + * @param sample - Sample tuple needed for topGenerateOpWithExPlanForFe + * @return - The POForEach operator + * @throws ExecException + */ + public static POForEach topForEachOPWithPlan(int[] fields, Tuple sample, List<Boolean> toBeFlattened) throws ExecException, PlanException{ + POGenerate gen = topGenerateOpWithExPlanForFe(fields, sample, toBeFlattened); + PhysicalPlan<PhysicalOperator> pp = new PhysicalPlan<PhysicalOperator>(); + pp.add(gen); + + POForEach ret = topForEachOp(); + ret.setPlan(pp); + ret.setResultType(DataType.TUPLE); + return ret; + } + + /** + * creates the POForEach operator for + * foreach A generate flatten(field) + * @param fields - The columns to be generated + * @param sample - Sample tuple needed for topGenerateOpWithExPlanForFe + * @return - The POForEach operator + * @throws ExecException + */ + public static POForEach topForEachOPWithPlan(int field) throws ExecException, PlanException{ + POGenerate gen = topGenerateOpWithExPlanForFeFlat(field); + PhysicalPlan<PhysicalOperator> pp = new PhysicalPlan<PhysicalOperator>(); + pp.add(gen); + + POForEach ret = topForEachOp(); + ret.setPlan(pp); + ret.setResultType(DataType.TUPLE); + return ret; + } + + public static POLoad topLoadOp() { POLoad ret = new POLoad(new OperatorKey("", r.nextLong())); + ret.setPc(pc); return ret; } @@ -250,7 +545,7 @@ } public static POFilter topFilterOpWithExPlan(int lhsVal, int rhsVal) - throws PlanException { + throws ExecException, PlanException { POFilter ret = new POFilter(new OperatorKey("", r.nextLong())); ConstantExpression ce1 = GenPhyOp.exprConst(); @@ -278,7 +573,7 @@ } public static POFilter topFilterOpWithProj(int col, int rhsVal) - throws PlanException { + throws ExecException, PlanException { POFilter ret = new POFilter(new OperatorKey("", r.nextLong())); POProject proj = exprProject(); @@ -306,26 +601,219 @@ return ret; } + + public static POFilter topFilterOpWithProj(int col, int rhsVal, + byte CompType) throws ExecException, PlanException { + POFilter ret = new POFilter(new OperatorKey("", r.nextLong())); + + POProject proj = exprProject(); + proj.setResultType(DataType.INTEGER); + proj.setColumn(col); + proj.setOverloaded(false); + + ConstantExpression ce2 = GenPhyOp.exprConst(); + ce2.setValue(rhsVal); + + ComparisonOperator cop = null; + switch (CompType) { + case GenPhyOp.GTE: + cop = GenPhyOp.compGTOrEqualToExpr(); + break; + case GenPhyOp.GT: + cop = GenPhyOp.compGreaterThanExpr(); + break; + case GenPhyOp.LTE: + cop = GenPhyOp.compLTOrEqualToExpr(); + break; + case GenPhyOp.LT: + cop = GenPhyOp.compLessThanExpr(); + break; + } + + cop.setLhs(proj); + cop.setRhs(ce2); + cop.setOperandType(DataType.INTEGER); + + ExprPlan ep = new ExprPlan(); + ep.add(proj); + ep.add(ce2); + ep.add(cop); + + ep.connect(proj, cop); + ep.connect(ce2, cop); + + ret.setPlan(ep); + + return ret; + } + + public static POFilter topFilterOpWithProjWithCast(int col, int rhsVal, byte CompType) + throws ExecException, PlanException { + POFilter ret = new POFilter(new OperatorKey("", r.nextLong())); + + POProject proj = exprProject(); + proj.setResultType(DataType.INTEGER); + proj.setColumn(col); + proj.setOverloaded(false); + + ConstantExpression ce2 = GenPhyOp.exprConst(); + ce2.setValue(rhsVal); + + ComparisonOperator cop = null; + switch(CompType){ + case GenPhyOp.GTE: + cop = GenPhyOp.compGTOrEqualToExpr(); + break; + case GenPhyOp.GT: + cop = GenPhyOp.compGreaterThanExpr(); + break; + case GenPhyOp.LTE: + cop = GenPhyOp.compLTOrEqualToExpr(); + break; + case GenPhyOp.LT: + cop = GenPhyOp.compLessThanExpr(); + break; + } + + POCast cst = new POCast(new OperatorKey("",r.nextLong())); + + cop.setLhs(cst); + cop.setRhs(ce2); + cop.setOperandType(DataType.INTEGER); + + ExprPlan ep = new ExprPlan(); + ep.add(cst); + ep.add(proj); + ep.add(ce2); + ep.add(cop); + + ep.connect(proj, cst); + ep.connect(cst, cop); + ep.connect(ce2, cop); + + ret.setPlan(ep); - // - // public static POGlobalRearrange topGlobalRearrangeOp(){ - // POGlobalRearrange ret = new POGlobalRearrange(new - // OperatorKey("",r.nextLong())); - // return ret; - // } - // - // public static POPackage topPackageOp(){ - // POPackage ret = new POPackage(new OperatorKey("",r.nextLong())); - // return ret; - // } - // + return ret; + } + public static POStore topStoreOp() { POStore ret = new POStore(new OperatorKey("", r.nextLong())); + ret.setPc(pc); + return ret; + } + + public static void setR(Random r) { + GenPhyOp.r = r; + } + + public static MapReduceOper MROp(){ + MapReduceOper ret = new MapReduceOper(new OperatorKey("",r.nextLong())); + return ret; + } + + private static FileSpec getTempFileSpec() throws IOException { + return new FileSpec(FileLocalizer.getTemporaryPath(null, pc).toString(),BinStorage.class.getName()); + } + + public static POSplit topSplitOp() throws IOException{ + POSplit ret = new POSplit(new OperatorKey("",r.nextLong())); + ret.setSplitStore(getTempFileSpec()); return ret; } - // - // public static StartMap topStartMapOp(){ - // StartMap ret = new StartMap(new OperatorKey("",r.nextLong())); - // return ret; - // } + + public static PhysicalPlan<PhysicalOperator> grpChain() throws ExecException, PlanException{ + PhysicalPlan<PhysicalOperator> grpChain = new PhysicalPlan<PhysicalOperator>(); + POLocalRearrange lr = GenPhyOp.topLocalRearrangeOp(); + POGlobalRearrange gr = GenPhyOp.topGlobalRearrangeOp(); + POPackage pk = GenPhyOp.topPackageOp(); + + grpChain.add(lr); + grpChain.add(gr); + grpChain.add(pk); + + grpChain.connect(lr, gr); + grpChain.connect(gr, pk); + + return grpChain; + } + + public static PhysicalPlan<PhysicalOperator> loadedGrpChain() throws ExecException, PlanException{ + PhysicalPlan<PhysicalOperator> ret = new PhysicalPlan<PhysicalOperator>(); + POLoad ld = GenPhyOp.topLoadOp(); + POLocalRearrange lr = GenPhyOp.topLocalRearrangeOp(); + POGlobalRearrange gr = GenPhyOp.topGlobalRearrangeOp(); + POPackage pk = GenPhyOp.topPackageOp(); + + ret.add(ld); + ret.add(lr); + ret.add(gr); + ret.add(pk); + + ret.connect(ld, lr); + ret.connect(lr, gr); + ret.connect(gr, pk); + + return ret; + } + + public static PhysicalPlan<PhysicalOperator> loadedFilter() throws ExecException, PlanException{ + PhysicalPlan<PhysicalOperator> ret = new PhysicalPlan<PhysicalOperator>(); + POLoad ld = GenPhyOp.topLoadOp(); + POFilter fl = GenPhyOp.topFilterOp(); + ret.add(ld); + ret.add(fl); + + ret.connect(ld, fl); + return ret; + } + + public static ExprPlan arithPlan() throws PlanException{ + ExprPlan ep = new ExprPlan(); + ConstantExpression ce[] = new ConstantExpression[7]; + for(int i=0;i<ce.length;i++){ + ce[i] = GenPhyOp.exprConst(); + ce[i].setValue(i); + ep.add(ce[i]); + } + + Add ad = new Add(getOK()); + ep.add(ad); + ep.connect(ce[0], ad); + ep.connect(ce[1], ad); + + Divide div = new Divide(getOK()); + ep.add(div); + ep.connect(ce[2], div); + ep.connect(ce[3], div); + + Subtract sub = new Subtract(getOK()); + ep.add(sub); + ep.connect(ad, sub); + ep.connect(div, sub); + + Mod mod = new Mod(getOK()); + ep.add(mod); + ep.connect(ce[4], mod); + ep.connect(ce[5], mod); + + Multiply mul1 = new Multiply(getOK()); + ep.add(mul1); + ep.connect(mod, mul1); + ep.connect(ce[6], mul1); + + Multiply mul2 = new Multiply(getOK()); + ep.add(mul2); + ep.connect(sub, mul2); + ep.connect(mul1, mul2); + + return ep; + } + + private static OperatorKey getOK(){ + return new OperatorKey("",r.nextLong()); + } + + public static void setPc(PigContext pc) { + GenPhyOp.pc = pc; + } } Modified: incubator/pig/branches/types/test/org/apache/pig/test/utils/TestHelper.java URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/test/org/apache/pig/test/utils/TestHelper.java?rev=656011&r1=656010&r2=656011&view=diff ============================================================================== --- incubator/pig/branches/types/test/org/apache/pig/test/utils/TestHelper.java (original) +++ incubator/pig/branches/types/test/org/apache/pig/test/utils/TestHelper.java Tue May 13 14:11:21 2008 @@ -17,13 +17,24 @@ */ package org.apache.pig.test.utils; +import java.io.IOException; +import java.io.InputStream; import java.util.Iterator; +import java.util.Random; import org.apache.pig.backend.executionengine.ExecException; import org.apache.pig.data.DataBag; +import org.apache.pig.data.DataByteArray; import org.apache.pig.data.DefaultBagFactory; import org.apache.pig.data.DefaultTuple; import org.apache.pig.data.Tuple; +import org.apache.pig.data.TupleFactory; +import org.apache.pig.impl.PigContext; +import org.apache.pig.impl.io.FileSpec; +import org.apache.pig.impl.logicalLayer.OperatorKey; +import org.apache.pig.impl.physicalLayer.POStatus; +import org.apache.pig.impl.physicalLayer.Result; +import org.apache.pig.impl.physicalLayer.topLevelOperators.POLoad; /** * Will contain static methods that will be useful @@ -31,6 +42,7 @@ * */ public class TestHelper { + public static int dispAfterNumTuples = 1000; public static boolean bagContains(DataBag db, Tuple t) { Iterator<Tuple> iter = db.iterator(); for (Tuple tuple : db) { @@ -43,7 +55,8 @@ public static boolean compareBags(DataBag db1, DataBag db2) { if (db1.size() != db2.size()) return false; - + + int i=-1; boolean equal = true; for (Tuple tuple : db2) { boolean contains = false; @@ -57,6 +70,8 @@ equal = false; break; } + /*if(++i%dispAfterNumTuples==0) + System.out.println(i/dispAfterNumTuples);*/ } return equal; } @@ -71,4 +86,129 @@ } return ret; } + + public static DataBag projectBag(DataBag db2, int[] fields) throws ExecException { + DataBag ret = DefaultBagFactory.getInstance().newDefaultBag(); + for (Tuple tuple : db2) { + Tuple t1 = new DefaultTuple(); + for (int fld : fields) { + Object o = tuple.get(fld); + t1.append(o); + } + ret.add(t1); + } + return ret; + } + + public static int compareInputStreams(InputStream exp, InputStream act) throws IOException{ + byte[] bExp = new byte[4096], bAct = new byte[4096]; + + int outLen,inLen = -1; + while(act.read(bAct)!=-1){ + exp.read(bExp); + int cmp = compareByteArray(bExp, bAct); + if(cmp!=0) + return cmp; + } + return 0; + } + + public static int compareByteArray(byte[] b1, byte[] b2){ + if(b1.length>b2.length) + return 1; + else if(b1.length<b2.length) + return -1; + for(int i=0;i<b1.length;i++){ + if(b1[i]>b2[i]) + return 1; + else if(b1[i]<b2[i]) + return -1; + } + return 0; + } + + /*public static boolean areFilesSame(FileSpec expLocal, FileSpec actHadoop, PigContext pc, int dispAftNumTuples) throws ExecException, IOException{ + Random r = new Random(); + + POLoad ldExp = new POLoad(new OperatorKey("", r.nextLong())); + ldExp.setPc(pc); + ldExp.setLFile(expLocal); + + POLoad ldAct = new POLoad(new OperatorKey("", r.nextLong())); + ldAct.setPc(pc); + ldAct.setLFile(actHadoop); + + Tuple t = null; + int numActTuples = -1; + boolean matches = true; + for(Result resAct=ldAct.getNext(t);resAct.returnStatus!=POStatus.STATUS_EOP;resAct=ldAct.getNext(t)){ + Tuple tupAct = (Tuple)resAct.result; + ++numActTuples; + boolean found = false; + for(Result resExp=ldExp.getNext(t);resExp.returnStatus!=POStatus.STATUS_EOP;resExp=ldExp.getNext(t)){ + Tuple tupExp = (Tuple)resExp.result; + if(tupAct.compareTo(tupExp)==0){ + found = true; + ldExp.tearDown(); + break; + } + } + if(!found){ + matches = false; + break; + } + if(numActTuples%dispAftNumTuples ==0) + System.out.println(numActTuples/dispAftNumTuples); + } + + int numExpTuples = -1; + while(ldExp.getNext(t).returnStatus!=POStatus.STATUS_EOP) + ++numExpTuples; + + return (matches && numActTuples==numExpTuples); + }*/ + + public static boolean areFilesSame(FileSpec expLocal, FileSpec actHadoop, PigContext pc) throws ExecException, IOException{ + Random r = new Random(); + + POLoad ldExp = new POLoad(new OperatorKey("", r.nextLong())); + ldExp.setPc(pc); + ldExp.setLFile(expLocal); + + POLoad ldAct = new POLoad(new OperatorKey("", r.nextLong())); + ldAct.setPc(pc); + ldAct.setLFile(actHadoop); + + Tuple t = null; + int numActTuples = -1; + DataBag bagAct = DefaultBagFactory.getInstance().newDefaultBag(); + Result resAct = null; + while((resAct = ldAct.getNext(t)).returnStatus!=POStatus.STATUS_EOP){ + ++numActTuples; + bagAct.add(trimTuple((Tuple)resAct.result)); + } + + int numExpTuples = -1; + DataBag bagExp = DefaultBagFactory.getInstance().newDefaultBag(); + Result resExp = null; + while((resExp = ldExp.getNext(t)).returnStatus!=POStatus.STATUS_EOP){ + ++numExpTuples; + bagExp.add(trimTuple((Tuple)resExp.result)); + } + + if(numActTuples!=numExpTuples) + return false; + + return compareBags(bagExp, bagAct); + } + + private static Tuple trimTuple(Tuple t){ + Tuple ret = TupleFactory.getInstance().newTuple(); + for (Object o : t.getAll()) { + DataByteArray dba = (DataByteArray)o; + DataByteArray nDba = new DataByteArray(dba.toString().trim().getBytes()); + ret.append(nDba); + } + return ret; + } }