Author: olga Date: Thu Aug 14 14:28:58 2008 New Revision: 686049 URL: http://svn.apache.org/viewvc?rev=686049&view=rev Log: PIG-368: support for filter UDFs
Added: incubator/pig/branches/types/test/org/apache/pig/test/TestFilterUDF.java Modified: incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/LogToPhyTranslationVisitor.java incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POFilter.java incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/validators/TypeCheckingVisitor.java Modified: incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/LogToPhyTranslationVisitor.java URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/LogToPhyTranslationVisitor.java?rev=686049&r1=686048&r2=686049&view=diff ============================================================================== --- incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/LogToPhyTranslationVisitor.java (original) +++ incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/LogToPhyTranslationVisitor.java Thu Aug 14 14:28:58 2008 @@ -949,14 +949,16 @@ p.setResultType(func.getType()); currentPlan.add(p); List<LogicalOperator> fromList = func.getPlan().getPredecessors(func); - for (LogicalOperator op : fromList) { - PhysicalOperator from = LogToPhyMap.get(op); - try { - currentPlan.connect(from, p); - } catch (PlanException e) { - log.error("Invalid physical operator in the plan" - + e.getMessage()); - throw new VisitorException(e); + if(fromList!=null){ + for (LogicalOperator op : fromList) { + PhysicalOperator from = LogToPhyMap.get(op); + try { + currentPlan.connect(from, p); + } catch (PlanException e) { + log.error("Invalid physical operator in the plan" + + e.getMessage()); + throw new VisitorException(e); + } } } LogToPhyMap.put(func, p); Modified: incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POFilter.java URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POFilter.java?rev=686049&r1=686048&r2=686049&view=diff ============================================================================== --- incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POFilter.java (original) +++ incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POFilter.java Thu Aug 14 14:28:58 2008 @@ -55,7 +55,9 @@ PhysicalPlan plan; // The root comparison operator of the expression plan - ComparisonOperator comOp; +// ComparisonOperator comOp; + PhysicalOperator comOp; + // The operand type for the comparison operator needed // to call the comparison operators getNext with the @@ -176,8 +178,8 @@ public void setPlan(PhysicalPlan plan) { this.plan = plan; - comOp = (ComparisonOperator) (plan.getLeaves()).get(0); - compOperandType = comOp.getOperandType(); + comOp = plan.getLeaves().get(0); +// compOperandType = comOp.getOperandType(); } public PhysicalPlan getPlan() { Modified: incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/validators/TypeCheckingVisitor.java URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/validators/TypeCheckingVisitor.java?rev=686049&r1=686048&r2=686049&view=diff ============================================================================== --- incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/validators/TypeCheckingVisitor.java (original) +++ incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/validators/TypeCheckingVisitor.java Thu Aug 14 14:28:58 2008 @@ -1325,22 +1325,24 @@ // without this // Assuming all aggregates has only one argument at this stage - ExpressionOperator tmpExp = func.getArguments().get(0) ; - if ( (ef instanceof Algebraic) - && (tmpExp instanceof LOProject) - && (((LOProject)tmpExp).getSentinel())) { - - FieldSchema tmpField ; - - try { - // embed the schema above inside a bag - tmpField = new FieldSchema(null, s, DataType.BAG) ; - } - catch (FrontendException e) { - throw new VisitorException(e) ; + if(func.getArguments()!=null && func.getArguments().size()>0){ + ExpressionOperator tmpExp = func.getArguments().get(0) ; + if ( (ef instanceof Algebraic) + && (tmpExp instanceof LOProject) + && (((LOProject)tmpExp).getSentinel())) { + + FieldSchema tmpField ; + + try { + // embed the schema above inside a bag + tmpField = new FieldSchema(null, s, DataType.BAG) ; + } + catch (FrontendException e) { + throw new VisitorException(e) ; + } + + s = new Schema(tmpField) ; } - - s = new Schema(tmpField) ; } // ask the EvalFunc what types of inputs it can handle @@ -2302,6 +2304,9 @@ else if (op instanceof LOConst) { // don't have to do anything } + else if (op instanceof LOUserFunc){ + visit((LOUserFunc)op); + } else { String msg = "Unsupported root operator in inner plan:" + op.getClass().getSimpleName() ; Added: incubator/pig/branches/types/test/org/apache/pig/test/TestFilterUDF.java URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/test/org/apache/pig/test/TestFilterUDF.java?rev=686049&view=auto ============================================================================== --- incubator/pig/branches/types/test/org/apache/pig/test/TestFilterUDF.java (added) +++ incubator/pig/branches/types/test/org/apache/pig/test/TestFilterUDF.java Thu Aug 14 14:28:58 2008 @@ -0,0 +1,75 @@ +package org.apache.pig.test; + + +import java.io.File; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.PrintStream; +import java.io.PrintWriter; +import java.util.Iterator; +import java.util.Random; + +import junit.framework.TestCase; + +import org.apache.pig.EvalFunc; +import org.apache.pig.ExecType; +import org.apache.pig.PigServer; +import org.apache.pig.backend.executionengine.ExecException; +import org.apache.pig.data.DataBag; +import org.apache.pig.data.DataType; +import org.apache.pig.data.Tuple; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +public class TestFilterUDF extends TestCase { + private PigServer pigServer; + + @Before + public void setUp() throws Exception { + pigServer = new PigServer(ExecType.LOCAL); + } + + @After + public void tearDown() throws Exception { + } + + static public class MyFilterFunction extends EvalFunc<Boolean>{ + + @Override + public Boolean exec(Tuple input) throws IOException { + try { + int col = (Integer)input.get(0); + if(col>10) + return true; + } catch (ExecException e) { + // TODO Auto-generated catch block + e.printStackTrace(); + } + return false; + } + + } + + @Test + public void testFilterUDF() throws Exception{ + int LOOP_SIZE = 20; + File tmpFile = File.createTempFile("test", "txt"); + PrintStream ps = new PrintStream(new FileOutputStream(tmpFile)); + for(int i = 1; i <= LOOP_SIZE; i++) { + ps.println(i); + } + ps.close(); + pigServer.registerQuery("A = LOAD 'file:" + tmpFile + "' as (x:int);"); + pigServer.registerQuery("B = filter A by " + MyFilterFunction.class.getName() + "();"); + Iterator<Tuple> iter = pigServer.openIterator("B"); + if(!iter.hasNext()) fail("No Output received"); + int cnt = 0; + while(iter.hasNext()){ + Tuple t = iter.next(); + assertEquals(true,(Integer)t.get(0)>10); + ++cnt; + } + assertEquals(10, cnt); + } +}