Author: rohini
Date: Sun Jun 14 20:13:51 2015
New Revision: 1685455

URL: http://svn.apache.org/r1685455
Log:
PIG-4365: TOP udf should implement Accumulator interface (eyal via rohini)

Modified:
    pig/trunk/CHANGES.txt
    pig/trunk/src/org/apache/pig/builtin/TOP.java
    pig/trunk/test/org/apache/pig/builtin/TestTOP.java
    pig/trunk/test/org/apache/pig/test/TestAccumulator.java

Modified: pig/trunk/CHANGES.txt
URL: 
http://svn.apache.org/viewvc/pig/trunk/CHANGES.txt?rev=1685455&r1=1685454&r2=1685455&view=diff
==============================================================================
--- pig/trunk/CHANGES.txt (original)
+++ pig/trunk/CHANGES.txt Sun Jun 14 20:13:51 2015
@@ -24,6 +24,8 @@ INCOMPATIBLE CHANGES
 
 IMPROVEMENTS
 
+PIG-4365: TOP udf should implement Accumulator interface (eyal via rohini)
+
 PIG-4570: Allow AvroStorage to use a class for the schema (pmazak via daijy)
 
 BUG FIXES

Modified: pig/trunk/src/org/apache/pig/builtin/TOP.java
URL: 
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/builtin/TOP.java?rev=1685455&r1=1685454&r2=1685455&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/builtin/TOP.java (original)
+++ pig/trunk/src/org/apache/pig/builtin/TOP.java Sun Jun 14 20:13:51 2015
@@ -27,6 +27,7 @@ import java.util.Random;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.pig.AccumulatorEvalFunc;
 import org.apache.pig.Algebraic;
 import org.apache.pig.EvalFunc;
 import org.apache.pig.FuncSpec;
@@ -68,7 +69,7 @@ import org.apache.pig.impl.logicalLayer.
  *          GENERATE FLATTEN(result); *          
  *  }
  */
-public class TOP extends EvalFunc<DataBag> implements Algebraic{
+public class TOP extends AccumulatorEvalFunc<DataBag> implements Algebraic {
     private static final Log log = LogFactory.getLog(TOP.class);
     private static BagFactory mBagFactory = BagFactory.getInstance();
     private static TupleFactory mTupleFactory = TupleFactory.getInstance();
@@ -139,35 +140,27 @@ public class TOP extends EvalFunc<DataBa
         }
     }
 
+    // for Accumulator interface
+    private PriorityQueue<Tuple> store = null;
+    
     @Override
-    public DataBag exec(Tuple tuple) throws IOException {
+    public void accumulate(Tuple tuple) throws IOException {
         if (tuple == null || tuple.size() < 3) {
-            return null;
+            return;
         }
         try {
             int n = (Integer) tuple.get(0);
             int fieldNum = (Integer) tuple.get(1);
             DataBag inputBag = (DataBag) tuple.get(2);
             if (inputBag == null) {
-                return null;
+                return;
             }
 
-            PriorityQueue<Tuple> store = new PriorityQueue<Tuple>(n + 1,
-                    new TupleComparator(fieldNum, sortDesc));
-            updateTop(store, n, inputBag);
-            DataBag outputBag = mBagFactory.newDefaultBag();
-            for (Tuple t : store) {
-                outputBag.add(t);
-            }
-            if (log.isDebugEnabled()) {
-                if (randomizer.nextInt(1000) == 1) {
-                    log.debug("outputting a bag: ");
-                    for (Tuple t : outputBag)
-                        log.debug("outputting "+t.toDelimitedString("\t"));
-                    log.debug("==================");
-                }
+            if (store == null) {
+                store = new PriorityQueue<Tuple>(n + 1, new 
TupleComparator(fieldNum, sortDesc));
             }
-            return outputBag;
+            
+            updateTop(store, n, inputBag);
         } catch (ExecException e) {
             throw new RuntimeException("ExecException executing function: ", 
e);
         } catch (Exception e) {
@@ -175,6 +168,40 @@ public class TOP extends EvalFunc<DataBa
         }
     }
 
+       @Override
+       public DataBag getValue() {
+        if (store == null) {
+            return null;
+        }
+        
+        DataBag outputBag = mBagFactory.newDefaultBag();
+        
+        for (Tuple t : store) {
+            outputBag.add(t);
+        }
+        
+        if (log.isDebugEnabled()) {
+            if (randomizer.nextInt(1000) == 1) {
+                log.debug("outputting a bag: ");
+                try {
+                    for (Tuple t : outputBag) {
+                        log.debug("outputting "+t.toDelimitedString("\t"));
+                    }
+                    } catch (ExecException e) {
+                        throw new RuntimeException("ExecException executing 
function: ", e);
+                    }
+                log.debug("==================");
+            }
+        }
+        
+        return outputBag;
+    }
+
+    @Override
+    public void cleanup() {
+        store = null;
+    }
+
     protected static void updateTop(PriorityQueue<Tuple> store, int limit, 
DataBag inputBag) {
         Iterator<Tuple> itr = inputBag.iterator();
         while (itr.hasNext()) {

Modified: pig/trunk/test/org/apache/pig/builtin/TestTOP.java
URL: 
http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/builtin/TestTOP.java?rev=1685455&r1=1685454&r2=1685455&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/builtin/TestTOP.java (original)
+++ pig/trunk/test/org/apache/pig/builtin/TestTOP.java Sun Jun 14 20:13:51 2015
@@ -33,25 +33,34 @@ import org.junit.Test;
 public class TestTOP {
     private static TupleFactory tupleFactory_ = TupleFactory.getInstance();
     private static BagFactory bagFactory_ = BagFactory.getInstance();
-    private static Tuple inputTuple_ = tupleFactory_.newTuple(3);
-    private static DataBag dBag_ = bagFactory_.newDefaultBag();
+    private static Tuple inputTuple_ = null;
 
     @BeforeClass
     public static void setup() throws ExecException {
+        inputTuple_ = fillTuple(0, 100);
+    }
+    
+    public static Tuple fillTuple(int start, int stop) throws ExecException {
+        Tuple tuple = tupleFactory_.newTuple(3);
+        
+        DataBag dBag = bagFactory_.newDefaultBag();
+
         // set N = 10 i.e retain top 10 tuples
-        inputTuple_.set(0, 10);
+        tuple.set(0, 10);
         // compare tuples by field number 1
-        inputTuple_.set(1, 1);
+        tuple.set(1, 1);
         // set the data bag containing the tuples
-        inputTuple_.set(2, dBag_);
+        tuple.set(2, dBag);
 
         // generate tuples of the form (group-1, 1), (group-2, 2) ...
-        for (long i = 0; i < 100; i++) {
+        for (long i = start; i < stop; i++) {
             Tuple nestedTuple = tupleFactory_.newTuple(2);
             nestedTuple.set(0, "group-" + i);
             nestedTuple.set(1, i);
-            dBag_.add(nestedTuple);
+            dBag.add(nestedTuple);
         }
+        
+        return tuple;
     }
 
     @Test
@@ -66,6 +75,26 @@ public class TestTOP {
         assertEquals(outBag.size(), 10L);
         checkItemsLT(outBag, 1, 10);
     }
+
+    @Test
+    public void testTOPAccumulator() throws Exception {
+        Tuple firstTuple = fillTuple(0, 50);
+        Tuple secondTuple = fillTuple(50, 100);
+        
+        TOP top = new TOP("DESC");
+        top.accumulate(firstTuple);
+        top.accumulate(secondTuple);
+        DataBag outBag = top.getValue();
+        assertEquals(outBag.size(), 10L);
+        checkItemsGT(outBag, 1, 89);
+
+        top = new TOP("ASC");
+        top.accumulate(firstTuple);
+        top.accumulate(secondTuple);
+        outBag = top.getValue();
+        assertEquals(outBag.size(), 10L);
+        checkItemsLT(outBag, 1, 10);
+    }
 
     @Test
     public void testTopAlgebraic() throws IOException {

Modified: pig/trunk/test/org/apache/pig/test/TestAccumulator.java
URL: 
http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestAccumulator.java?rev=1685455&r1=1685454&r2=1685455&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/test/TestAccumulator.java (original)
+++ pig/trunk/test/org/apache/pig/test/TestAccumulator.java Sun Jun 14 20:13:51 
2015
@@ -558,6 +558,21 @@ public class TestAccumulator {
         }
     }
 
+    // Pig 4365
+    @Test
+    public void testAccumWithTOP() throws IOException{
+        pigServer.registerQuery("A = load '" + INPUT_FILE3 + "' as (id:int, 
v:double);");
+        pigServer.registerQuery("B = group A all;");
+        pigServer.registerQuery("D = foreach B { C = TOP(5, 0, A); generate 
flatten(C); }");
+        
+        Iterator<Tuple> iter = pigServer.openIterator("D");
+    
+        List<Tuple> expected = Util.getTuplesFromConstantTupleStrings(
+                new String[] {"(200,1.1)", "(200,2.1)", "(300,3.3)", 
"(400,null)", "(400,null)" });
+        
+        Util.checkQueryOutputsAfterSort(iter, expected);
+    }
+
     @Test
     public void testAccumWithMultiBuiltin() throws IOException{
         pigServer.registerQuery("A = load '" + INPUT_FILE1 + "' as (id:int, 
c:chararray);");


Reply via email to