Author: rohini
Date: Sun Jun 14 20:13:51 2015
New Revision: 1685455
URL: http://svn.apache.org/r1685455
Log:
PIG-4365: TOP udf should implement Accumulator interface (eyal via rohini)
Modified:
pig/trunk/CHANGES.txt
pig/trunk/src/org/apache/pig/builtin/TOP.java
pig/trunk/test/org/apache/pig/builtin/TestTOP.java
pig/trunk/test/org/apache/pig/test/TestAccumulator.java
Modified: pig/trunk/CHANGES.txt
URL:
http://svn.apache.org/viewvc/pig/trunk/CHANGES.txt?rev=1685455&r1=1685454&r2=1685455&view=diff
==============================================================================
--- pig/trunk/CHANGES.txt (original)
+++ pig/trunk/CHANGES.txt Sun Jun 14 20:13:51 2015
@@ -24,6 +24,8 @@ INCOMPATIBLE CHANGES
IMPROVEMENTS
+PIG-4365: TOP udf should implement Accumulator interface (eyal via rohini)
+
PIG-4570: Allow AvroStorage to use a class for the schema (pmazak via daijy)
BUG FIXES
Modified: pig/trunk/src/org/apache/pig/builtin/TOP.java
URL:
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/builtin/TOP.java?rev=1685455&r1=1685454&r2=1685455&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/builtin/TOP.java (original)
+++ pig/trunk/src/org/apache/pig/builtin/TOP.java Sun Jun 14 20:13:51 2015
@@ -27,6 +27,7 @@ import java.util.Random;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.pig.AccumulatorEvalFunc;
import org.apache.pig.Algebraic;
import org.apache.pig.EvalFunc;
import org.apache.pig.FuncSpec;
@@ -68,7 +69,7 @@ import org.apache.pig.impl.logicalLayer.
* GENERATE FLATTEN(result); *
* }
*/
-public class TOP extends EvalFunc<DataBag> implements Algebraic{
+public class TOP extends AccumulatorEvalFunc<DataBag> implements Algebraic {
private static final Log log = LogFactory.getLog(TOP.class);
private static BagFactory mBagFactory = BagFactory.getInstance();
private static TupleFactory mTupleFactory = TupleFactory.getInstance();
@@ -139,35 +140,27 @@ public class TOP extends EvalFunc<DataBa
}
}
+ // for Accumulator interface
+ private PriorityQueue<Tuple> store = null;
+
@Override
- public DataBag exec(Tuple tuple) throws IOException {
+ public void accumulate(Tuple tuple) throws IOException {
if (tuple == null || tuple.size() < 3) {
- return null;
+ return;
}
try {
int n = (Integer) tuple.get(0);
int fieldNum = (Integer) tuple.get(1);
DataBag inputBag = (DataBag) tuple.get(2);
if (inputBag == null) {
- return null;
+ return;
}
- PriorityQueue<Tuple> store = new PriorityQueue<Tuple>(n + 1,
- new TupleComparator(fieldNum, sortDesc));
- updateTop(store, n, inputBag);
- DataBag outputBag = mBagFactory.newDefaultBag();
- for (Tuple t : store) {
- outputBag.add(t);
- }
- if (log.isDebugEnabled()) {
- if (randomizer.nextInt(1000) == 1) {
- log.debug("outputting a bag: ");
- for (Tuple t : outputBag)
- log.debug("outputting "+t.toDelimitedString("\t"));
- log.debug("==================");
- }
+ if (store == null) {
+ store = new PriorityQueue<Tuple>(n + 1, new
TupleComparator(fieldNum, sortDesc));
}
- return outputBag;
+
+ updateTop(store, n, inputBag);
} catch (ExecException e) {
throw new RuntimeException("ExecException executing function: ",
e);
} catch (Exception e) {
@@ -175,6 +168,40 @@ public class TOP extends EvalFunc<DataBa
}
}
+ @Override
+ public DataBag getValue() {
+ if (store == null) {
+ return null;
+ }
+
+ DataBag outputBag = mBagFactory.newDefaultBag();
+
+ for (Tuple t : store) {
+ outputBag.add(t);
+ }
+
+ if (log.isDebugEnabled()) {
+ if (randomizer.nextInt(1000) == 1) {
+ log.debug("outputting a bag: ");
+ try {
+ for (Tuple t : outputBag) {
+ log.debug("outputting "+t.toDelimitedString("\t"));
+ }
+ } catch (ExecException e) {
+ throw new RuntimeException("ExecException executing
function: ", e);
+ }
+ log.debug("==================");
+ }
+ }
+
+ return outputBag;
+ }
+
+ @Override
+ public void cleanup() {
+ store = null;
+ }
+
protected static void updateTop(PriorityQueue<Tuple> store, int limit,
DataBag inputBag) {
Iterator<Tuple> itr = inputBag.iterator();
while (itr.hasNext()) {
Modified: pig/trunk/test/org/apache/pig/builtin/TestTOP.java
URL:
http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/builtin/TestTOP.java?rev=1685455&r1=1685454&r2=1685455&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/builtin/TestTOP.java (original)
+++ pig/trunk/test/org/apache/pig/builtin/TestTOP.java Sun Jun 14 20:13:51 2015
@@ -33,25 +33,34 @@ import org.junit.Test;
public class TestTOP {
private static TupleFactory tupleFactory_ = TupleFactory.getInstance();
private static BagFactory bagFactory_ = BagFactory.getInstance();
- private static Tuple inputTuple_ = tupleFactory_.newTuple(3);
- private static DataBag dBag_ = bagFactory_.newDefaultBag();
+ private static Tuple inputTuple_ = null;
@BeforeClass
public static void setup() throws ExecException {
+ inputTuple_ = fillTuple(0, 100);
+ }
+
+ public static Tuple fillTuple(int start, int stop) throws ExecException {
+ Tuple tuple = tupleFactory_.newTuple(3);
+
+ DataBag dBag = bagFactory_.newDefaultBag();
+
// set N = 10 i.e retain top 10 tuples
- inputTuple_.set(0, 10);
+ tuple.set(0, 10);
// compare tuples by field number 1
- inputTuple_.set(1, 1);
+ tuple.set(1, 1);
// set the data bag containing the tuples
- inputTuple_.set(2, dBag_);
+ tuple.set(2, dBag);
// generate tuples of the form (group-1, 1), (group-2, 2) ...
- for (long i = 0; i < 100; i++) {
+ for (long i = start; i < stop; i++) {
Tuple nestedTuple = tupleFactory_.newTuple(2);
nestedTuple.set(0, "group-" + i);
nestedTuple.set(1, i);
- dBag_.add(nestedTuple);
+ dBag.add(nestedTuple);
}
+
+ return tuple;
}
@Test
@@ -66,6 +75,26 @@ public class TestTOP {
assertEquals(outBag.size(), 10L);
checkItemsLT(outBag, 1, 10);
}
+
+ @Test
+ public void testTOPAccumulator() throws Exception {
+ Tuple firstTuple = fillTuple(0, 50);
+ Tuple secondTuple = fillTuple(50, 100);
+
+ TOP top = new TOP("DESC");
+ top.accumulate(firstTuple);
+ top.accumulate(secondTuple);
+ DataBag outBag = top.getValue();
+ assertEquals(outBag.size(), 10L);
+ checkItemsGT(outBag, 1, 89);
+
+ top = new TOP("ASC");
+ top.accumulate(firstTuple);
+ top.accumulate(secondTuple);
+ outBag = top.getValue();
+ assertEquals(outBag.size(), 10L);
+ checkItemsLT(outBag, 1, 10);
+ }
@Test
public void testTopAlgebraic() throws IOException {
Modified: pig/trunk/test/org/apache/pig/test/TestAccumulator.java
URL:
http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestAccumulator.java?rev=1685455&r1=1685454&r2=1685455&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/test/TestAccumulator.java (original)
+++ pig/trunk/test/org/apache/pig/test/TestAccumulator.java Sun Jun 14 20:13:51
2015
@@ -558,6 +558,21 @@ public class TestAccumulator {
}
}
+ // Pig 4365
+ @Test
+ public void testAccumWithTOP() throws IOException{
+ pigServer.registerQuery("A = load '" + INPUT_FILE3 + "' as (id:int,
v:double);");
+ pigServer.registerQuery("B = group A all;");
+ pigServer.registerQuery("D = foreach B { C = TOP(5, 0, A); generate
flatten(C); }");
+
+ Iterator<Tuple> iter = pigServer.openIterator("D");
+
+ List<Tuple> expected = Util.getTuplesFromConstantTupleStrings(
+ new String[] {"(200,1.1)", "(200,2.1)", "(300,3.3)",
"(400,null)", "(400,null)" });
+
+ Util.checkQueryOutputsAfterSort(iter, expected);
+ }
+
@Test
public void testAccumWithMultiBuiltin() throws IOException{
pigServer.registerQuery("A = load '" + INPUT_FILE1 + "' as (id:int,
c:chararray);");