Modified: pig/branches/spark/test/org/apache/pig/test/TestBuiltin.java URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/test/TestBuiltin.java?rev=1733627&r1=1733626&r2=1733627&view=diff ============================================================================== --- pig/branches/spark/test/org/apache/pig/test/TestBuiltin.java (original) +++ pig/branches/spark/test/org/apache/pig/test/TestBuiltin.java Fri Mar 4 18:17:39 2016 @@ -18,12 +18,15 @@ package org.apache.pig.test; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotEquals; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; import java.io.File; import java.io.IOException; import java.lang.reflect.Method; +import java.math.BigDecimal; +import java.math.BigInteger; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; @@ -37,15 +40,21 @@ import java.util.Properties; import java.util.Random; import java.util.Set; import java.util.StringTokenizer; -import java.math.BigDecimal; -import java.math.BigInteger; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; + +import org.apache.pig.Accumulator; import org.apache.pig.Algebraic; import org.apache.pig.EvalFunc; import org.apache.pig.LoadFunc; +import org.apache.pig.PigConstants; import org.apache.pig.PigServer; import org.apache.pig.backend.executionengine.ExecException; import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil; +import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapReduce; +import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MRConfiguration; import org.apache.pig.builtin.ARITY; import org.apache.pig.builtin.AddDuration; import org.apache.pig.builtin.BagSize; @@ -135,6 +144,7 @@ import org.junit.BeforeClass; import org.junit.Test; public class TestBuiltin { + private static final Log LOG = LogFactory.getLog(TestBuiltin.class); private static PigServer pigServer; private static Properties properties; private static MiniGenericCluster cluster; @@ -142,6 +152,8 @@ public class TestBuiltin { private TupleFactory tupleFactory = TupleFactory.getInstance(); private BagFactory bagFactory = DefaultBagFactory.getInstance(); + private static Tuple NULL_INPUT_TUPLE; + // some inputs private static Integer[] intInput = { 3, 1, 2, 4, 5, 7, null, 6, 8, 9, 10 }; private static Long[] intAsLong = { 3L, 1L, 2L, 4L, 5L, 7L, null, 6L, 8L, 9L, 10L }; @@ -178,6 +190,10 @@ public class TestBuiltin { // a bag of inputs of that type private static HashMap<String, Tuple> inputMap = new HashMap<String, Tuple>(); + // A mapping between a type name (example: "Integer") and tuples containing + // a bag of inputs of that type for accumulator functions + private static HashMap<String, Tuple[]> inputMapForAccumulate = new HashMap<String, Tuple[]>(); + // A mapping between name of Aggregate function and the input type of its // argument private static HashMap<String, String> allowedInput = new HashMap<String, String>(); @@ -217,6 +233,9 @@ public class TestBuiltin { // first set up EvalFuncMap and expectedMap setupEvalFuncMap(); + NULL_INPUT_TUPLE = TupleFactory.getInstance().newTuple(1); + NULL_INPUT_TUPLE.set(0, null); + expectedMap.put("SUM", new Double(55)); expectedMap.put("DoubleSum", new Double(170.567391834593)); expectedMap.put("IntSum", new Long(55)); @@ -258,8 +277,8 @@ public class TestBuiltin { // set up allowedInput for (String[] aggGroups : aggs) { int i = 0; - for (String agg: aggGroups) { - allowedInput.put(agg, inputTypeAsString[i++]); + for (String agg: aggGroups) { + allowedInput.put(agg, inputTypeAsString[i++]); } } @@ -339,6 +358,19 @@ public class TestBuiltin { inputMap.put("String", Util.loadNestTuple(TupleFactory.getInstance().newTuple(1), stringInput)); inputMap.put("DateTime", Util.loadNestTuple(TupleFactory.getInstance().newTuple(1), datetimeInput)); + // set up input hash for accumulate + inputMapForAccumulate.put("Integer", Util.splitCreateBagOfTuples(intInput,3)); + inputMapForAccumulate.put("IntegerAsLong", Util.splitCreateBagOfTuples(intAsLong,3)); + inputMapForAccumulate.put("Long", Util.splitCreateBagOfTuples(longInput,3)); + inputMapForAccumulate.put("Float", Util.splitCreateBagOfTuples(floatInput,3)); + inputMapForAccumulate.put("FloatAsDouble", Util.splitCreateBagOfTuples(floatAsDouble,3)); + inputMapForAccumulate.put("Double", Util.splitCreateBagOfTuples(doubleInput,3)); + inputMapForAccumulate.put("BigDecimal", Util.splitCreateBagOfTuples(bigDecimalInput,3)); + inputMapForAccumulate.put("BigInteger", Util.splitCreateBagOfTuples(bigIntegerInput,3)); + inputMapForAccumulate.put("ByteArray", Util.splitCreateBagOfTuples(ByteArrayInput,3)); + inputMapForAccumulate.put("ByteArrayAsDouble", Util.splitCreateBagOfTuples(baAsDouble,3)); + inputMapForAccumulate.put("String", Util.splitCreateBagOfTuples(stringInput,3)); + inputMapForAccumulate.put("DateTime", Util.splitCreateBagOfTuples(datetimeInput,3)); } @BeforeClass @@ -380,7 +412,7 @@ public class TestBuiltin { Tuple t3 = TupleFactory.getInstance().newTuple(2); t3.set(0, new DateTime("2007-03-05T03:05:03.000Z")); t3.set(1, "P1D"); - + assertEquals(func1.exec(t1), new DateTime("2009-01-07T01:07:02.000Z")); assertEquals(func1.exec(t2), new DateTime("2008-02-06T02:07:02.000Z")); assertEquals(func1.exec(t3), new DateTime("2007-03-06T03:05:03.000Z")); @@ -403,12 +435,42 @@ public class TestBuiltin { DateTime dt2 = func2.exec(t2); assertEquals(dt2.compareTo(new DateTime("2009-01-07T01:07:01.000Z")), 0); + Tuple t2space = TupleFactory.getInstance().newTuple(1); + t2space.set(0, "2009-01-07 01:07:01.000Z"); + DateTime dt2space = func2.exec(t2space); + assertEquals(dt2space.compareTo(new DateTime("2009-01-07T01:07:01.000Z")), 0); + + Tuple t2dateOnly = TupleFactory.getInstance().newTuple(1); + t2dateOnly.set(0, "2015-05-29"); + DateTime dt2dateOnly = func2.exec(t2dateOnly); + assertEquals(dt2dateOnly.compareTo(new DateTime("2015-05-29")), 0); + + Tuple t2dateSpaceHour = TupleFactory.getInstance().newTuple(1); + t2dateSpaceHour.set(0, "2015-05-29 11"); + DateTime dt2dateSpaceHour = func2.exec(t2dateSpaceHour); + assertEquals(dt2dateSpaceHour.compareTo(new DateTime("2015-05-29T11")), 0); + + Tuple t2dateSpaceHourMin = TupleFactory.getInstance().newTuple(1); + t2dateSpaceHourMin.set(0, "2015-05-29 11:38"); + DateTime dt2dateSpaceHourMin = func2.exec(t2dateSpaceHourMin); + assertEquals(dt2dateSpaceHourMin.compareTo(new DateTime("2015-05-29T11:38")), 0); + + Tuple t2dateSpaceHourMinSec = TupleFactory.getInstance().newTuple(1); + t2dateSpaceHourMinSec.set(0, "2015-05-29 11:38:39"); + DateTime dt2dateSpaceHourMinSec = func2.exec(t2dateSpaceHourMinSec); + assertEquals(dt2dateSpaceHourMinSec.compareTo(new DateTime("2015-05-29T11:38:39")), 0); + Tuple t3 = TupleFactory.getInstance().newTuple(1); t3.set(0, "2009-01-07T01:07:01.000+08:00"); DateTime dt3 = func2.exec(t3); assertEquals(dt3.compareTo(new DateTime("2009-01-07T01:07:01.000+08:00", DateTimeZone.forID("+08:00"))), 0); - ToDate2ARGS func3 = new ToDate2ARGS(); + Tuple t3space = TupleFactory.getInstance().newTuple(1); + t3space.set(0, "2009-01-07 01:07:01.000+08:00"); + DateTime dt3space = func2.exec(t3space); + assertEquals(dt3space.compareTo(new DateTime("2009-01-07T01:07:01.000+08:00", DateTimeZone.forID("+08:00"))), 0); + + ToDate2ARGS func3 = new ToDate2ARGS(); Tuple t4 = TupleFactory.getInstance().newTuple(2); t4.set(0, "2009.01.07 AD at 01:07:01"); t4.set(1, "yyyy.MM.dd G 'at' HH:mm:ss"); @@ -420,8 +482,8 @@ public class TestBuiltin { t5.set(1, "yyyy.MM.dd G 'at' HH:mm:ss Z"); DateTime dt5 = func3.exec(t5); assertEquals(dt5.compareTo(new DateTime("2009-01-07T01:07:01.000+08:00")), 0); - - ToDate3ARGS func4 = new ToDate3ARGS(); + + ToDate3ARGS func4 = new ToDate3ARGS(); Tuple t6 = TupleFactory.getInstance().newTuple(3); t6.set(0, "2009.01.07 AD at 01:07:01"); t6.set(1, "yyyy.MM.dd G 'at' HH:mm:ss"); @@ -465,13 +527,13 @@ public class TestBuiltin { t12.set(1, "yyyy.MM.dd G 'at' HH:mm:ss Z"); String dtStr4 = func6.exec(t12); assertEquals(dtStr4, "2009.01.07 AD at 01:07:01 +0800"); - + ToMilliSeconds func7 = new ToMilliSeconds(); Tuple t13 = TupleFactory.getInstance().newTuple(1); t13.set(0, new DateTime(1231290421000L)); Long ut2 = func7.exec(t11); assertEquals(ut2.longValue(), 1231290421000L); - + // Null handling t1.set(0, null); assertEquals(func1.exec(t1), null); @@ -907,6 +969,9 @@ public class TestBuiltin { } else { assertEquals(msg, (Double)output, (Double)getExpected(avgTypes[k]), 0.00001); } + + // Check null input + assertNull(avg.exec(NULL_INPUT_TUPLE)); } } @@ -1375,6 +1440,9 @@ public class TestBuiltin { else { assertEquals(msg, (Double)output, (Double)getExpected(sumTypes[k]), 0.00001); } + + // Check null input + assertNull(sum.exec(NULL_INPUT_TUPLE)); } } @@ -1421,7 +1489,7 @@ public class TestBuiltin { else if (inputType == "BigDecimal") assertEquals(msg, ((BigDecimal) output).toPlainString(), ((BigDecimal)getExpected(sumTypes[k])).toPlainString()); else if (inputType == "BigInteger") - assertEquals(msg, ((BigInteger) output).toString(), ((BigInteger)getExpected(sumTypes[k])).toString()); + assertEquals(msg, ((BigInteger) output).toString(), ((BigInteger)getExpected(sumTypes[k])).toString()); else { assertEquals(msg, (Double)output, (Double)getExpected(sumTypes[k]), 0.00001); } @@ -1439,28 +1507,10 @@ public class TestBuiltin { String msg = "[Testing " + minTypes[k] + " on input type: " + getInputType(minTypes[k]) + " ( (output) " + output + " == " + getExpected(minTypes[k]) + " (expected) )]"; + assertForInputType(inputType, msg, getExpected(minTypes[k]), output); - if (inputType == "ByteArray") { - assertEquals(msg, output, getExpected(minTypes[k])); - } else if (inputType == "Long") { - assertEquals(msg, output, getExpected(minTypes[k])); - } else if (inputType == "Integer") { - assertEquals(msg, output, getExpected(minTypes[k])); - } else if (inputType == "Double") { - assertEquals(msg, output, getExpected(minTypes[k])); - } else if (inputType == "Float") { - assertEquals(msg, output, getExpected(minTypes[k])); - } else if (inputType == "String") { - assertEquals(msg, output, getExpected(minTypes[k])); - } else if (inputType == "BigDecimal") { - assertEquals(msg, ((BigDecimal) output).toPlainString(), ((BigDecimal) getExpected(minTypes[k])).toPlainString()); - } else if (inputType == "BigInteger") { - assertEquals(msg, ((BigInteger) output).toString(), ((BigInteger) getExpected(minTypes[k])).toString()); - - } else if (inputType == "DateTime") { - // Compare millis so that we dont have to worry about TZ - assertEquals(msg, ((DateTime)output).getMillis(), ((DateTime)getExpected(minTypes[k])).getMillis()); - } + // Check null input + assertNull(min.exec(NULL_INPUT_TUPLE)); } } @@ -1469,7 +1519,7 @@ public class TestBuiltin { public void testMINIntermediate() throws Exception { String[] minTypes = {"MINIntermediate", "LongMinIntermediate", "IntMinIntermediate", "FloatMinIntermediate", - "BigDecimalMinIntermediate", "BigIntegerMinIntermediate", + "BigDecimalMinIntermediate", "BigIntegerMinIntermediate", "StringMinIntermediate", "DateTimeMinIntermediate"}; for (int k = 0; k < minTypes.length; k++) { EvalFunc<?> min = evalFuncMap.get(minTypes[k]); @@ -1479,28 +1529,7 @@ public class TestBuiltin { String msg = "[Testing " + minTypes[k] + " on input type: " + getInputType(minTypes[k]) + " ( (output) " + ((Tuple)output).get(0) + " == " + getExpected(minTypes[k]) + " (expected) )]"; - - if (inputType == "ByteArray") { - assertEquals(msg, ((Tuple)output).get(0), getExpected(minTypes[k])); - } else if (inputType == "Long") { - assertEquals(msg, ((Tuple)output).get(0), getExpected(minTypes[k])); - } else if (inputType == "Integer") { - assertEquals(msg, ((Tuple)output).get(0), getExpected(minTypes[k])); - } else if (inputType == "Double") { - assertEquals(msg, ((Tuple)output).get(0), getExpected(minTypes[k])); - } else if (inputType == "Float") { - assertEquals(msg, ((Tuple)output).get(0), getExpected(minTypes[k])); - } else if (inputType == "BigDecimal") { - assertEquals(msg, ((BigDecimal)((Tuple)output).get(0)).toPlainString(), ((BigDecimal)getExpected(minTypes[k])).toPlainString()); - } else if (inputType == "BigInteger") { - assertEquals(msg, ((BigInteger)((Tuple)output).get(0)).toString(), ((BigInteger)getExpected(minTypes[k])).toString()); - System.out.println("xxx: here"); - } else if (inputType == "String") { - assertEquals(msg, ((Tuple)output).get(0), getExpected(minTypes[k])); - } else if (inputType == "DateTime") { - // Compare millis so that we dont have to worry about TZ - assertEquals(msg, ((DateTime)((Tuple)output).get(0)).getMillis(), ((DateTime)getExpected(minTypes[k])).getMillis()); - } + assertForInputType(inputType, msg, getExpected(minTypes[k]), ((Tuple)output).get(0)); } } @@ -1515,27 +1544,23 @@ public class TestBuiltin { String msg = "[Testing " + minTypes[k] + " on input type: " + getInputType(minTypes[k]) + " ( (output) " + output + " == " + getExpected(minTypes[k]) + " (expected) )]"; + assertForInputType(inputType, msg, getExpected(minTypes[k]), output); + } + } - if (inputType == "ByteArray") { - assertEquals(msg, output, getExpected(minTypes[k])); - } else if (inputType == "Long") { - assertEquals(msg, output, getExpected(minTypes[k])); - } else if (inputType == "Integer") { - assertEquals(msg, output, getExpected(minTypes[k])); - } else if (inputType == "Double") { - assertEquals(msg, output, getExpected(minTypes[k])); - } else if (inputType == "Float") { - assertEquals(msg, output, getExpected(minTypes[k])); - } else if (inputType == "BigDecimal") { - assertEquals(msg, ((BigDecimal)output).toPlainString(), ((BigDecimal)getExpected(minTypes[k])).toPlainString()); - } else if (inputType == "BigInteger") { - assertEquals(msg, ((BigInteger)output).toString(), ((BigInteger)getExpected(minTypes[k])).toString()); - } else if (inputType == "String") { - assertEquals(msg, output, getExpected(minTypes[k])); - } else if (inputType == "DateTime") { - // Compare millis so that we dont have to worry about TZ - assertEquals(msg, ((DateTime)output).getMillis(), ((DateTime)getExpected(minTypes[k])).getMillis()); - } + @Test + public void testMINAccumulate() throws Exception { + String[] minTypes = {"MIN", "LongMin", "IntMin", "FloatMin","BigDecimalMin","BigIntegerMin", "StringMin", "DateTimeMin"}; + for (int k = 0; k < minTypes.length; k++) { + Accumulator<?> min = (Accumulator<?>)evalFuncMap.get(minTypes[k]); + String inputType = getInputType(minTypes[k]); + Tuple[] tuples = inputMapForAccumulate.get(inputType); + for (Tuple tup : tuples) + min.accumulate(tup); + Object output = min.getValue(); + String msg = "[Testing " + minTypes[k] + " accumulate on input type: " + getInputType(minTypes[k]) + " ( (output) " + + output + " == " + getExpected(minTypes[k]) + " (expected) )]"; + assertForInputType(inputType, msg, getExpected(minTypes[k]), output); } } @@ -1551,31 +1576,13 @@ public class TestBuiltin { String msg = "[Testing " + maxTypes[k] + " on input type: " + getInputType(maxTypes[k]) + " ( (output) " + output + " == " + getExpected(maxTypes[k]) + " (expected) )]"; + assertForInputType(inputType, msg, getExpected(maxTypes[k]), output); - if (inputType == "ByteArray") { - assertEquals(msg, output, getExpected(maxTypes[k])); - } else if (inputType == "Long") { - assertEquals(msg, output, getExpected(maxTypes[k])); - } else if (inputType == "Integer") { - assertEquals(msg, output, getExpected(maxTypes[k])); - } else if (inputType == "Double") { - assertEquals(msg, output, getExpected(maxTypes[k])); - } else if (inputType == "Float") { - assertEquals(msg, output, getExpected(maxTypes[k])); - } else if (inputType == "BigDecimal") { - assertEquals(msg, ((BigDecimal)output).toPlainString(), ((BigDecimal)getExpected(maxTypes[k])).toPlainString()); - } else if (inputType == "BigInteger") { - assertEquals(msg, ((BigInteger)output).toString(), ((BigInteger)getExpected(maxTypes[k])).toString()); - } else if (inputType == "String") { - assertEquals(msg, output, getExpected(maxTypes[k])); - } else if (inputType == "DateTime") { - // Compare millis so that we dont have to worry about TZ - assertEquals(msg, ((DateTime)output).getMillis(), ((DateTime)getExpected(maxTypes[k])).getMillis()); - } + // Check null input + assertNull(max.exec(NULL_INPUT_TUPLE)); } } - @Test public void testMAXIntermed() throws Exception { @@ -1590,27 +1597,7 @@ public class TestBuiltin { String msg = "[Testing " + maxTypes[k] + " on input type: " + getInputType(maxTypes[k]) + " ( (output) " + ((Tuple)output).get(0) + " == " + getExpected(maxTypes[k]) + " (expected) )]"; - - if (inputType == "ByteArray") { - assertEquals(msg, ((Tuple)output).get(0), getExpected(maxTypes[k])); - } else if (inputType == "Long") { - assertEquals(msg, ((Tuple)output).get(0), getExpected(maxTypes[k])); - } else if (inputType == "Integer") { - assertEquals(msg, ((Tuple)output).get(0), getExpected(maxTypes[k])); - } else if (inputType == "Double") { - assertEquals(msg, ((Tuple)output).get(0), getExpected(maxTypes[k])); - } else if (inputType == "Float") { - assertEquals(msg, ((Tuple)output).get(0), getExpected(maxTypes[k])); - } else if (inputType == "BigDecimal") { - assertEquals(msg, ((BigDecimal)((Tuple)output).get(0)).toPlainString(), ((BigDecimal)getExpected(maxTypes[k])).toPlainString()); - } else if (inputType == "BigInteger") { - assertEquals(msg, ((BigInteger)((Tuple)output).get(0)).toString(), ((BigInteger)getExpected(maxTypes[k])).toString()); - } else if (inputType == "String") { - assertEquals(msg, ((Tuple)output).get(0), getExpected(maxTypes[k])); - } else if (inputType == "DateTime") { - // Compare millis so that we dont have to worry about TZ - assertEquals(msg, ((DateTime)((Tuple)output).get(0)).getMillis(), ((DateTime)getExpected(maxTypes[k])).getMillis()); - } + assertForInputType(inputType, msg, getExpected(maxTypes[k]), ((Tuple)output).get(0)); } } @@ -1626,32 +1613,29 @@ public class TestBuiltin { String msg = "[Testing " + maxTypes[k] + " on input type: " + getInputType(maxTypes[k]) + " ( (output) " + output + " == " + getExpected(maxTypes[k]) + " (expected) )]"; - - if (inputType == "ByteArray") { - assertEquals(msg, output, getExpected(maxTypes[k])); - } else if (inputType == "Long") { - assertEquals(msg, output, getExpected(maxTypes[k])); - } else if (inputType == "Integer") { - assertEquals(msg, output, getExpected(maxTypes[k])); - } else if (inputType == "Double") { - assertEquals(msg, output, getExpected(maxTypes[k])); - } else if (inputType == "Float") { - assertEquals(msg, output, getExpected(maxTypes[k])); - } else if (inputType == "BigDecimal") { - assertEquals(msg, ((BigDecimal)output).toPlainString(), ((BigDecimal)getExpected(maxTypes[k])).toPlainString()); - } else if (inputType == "BigInteger") { - assertEquals(msg, ((BigInteger)output).toString(), ((BigInteger)getExpected(maxTypes[k])).toString()); - } else if (inputType == "String") { - assertEquals(msg, output, getExpected(maxTypes[k])); - } else if (inputType == "DateTime") { - // Compare millis so that we dont have to worry about TZ - assertEquals(msg, ((DateTime)output).getMillis(), ((DateTime)getExpected(maxTypes[k])).getMillis()); - } + assertForInputType(inputType, msg, getExpected(maxTypes[k]), output); } } @Test + public void testMAXAccumulate() throws Exception { + String[] maxTypes = {"MAX", "LongMax", "IntMax", "FloatMax", "BigDecimalMax", "BigIntegerMax", "StringMax", "DateTimeMax"}; + for (int k = 0; k < maxTypes.length; k++) { + Accumulator<?> max = (Accumulator<?>)evalFuncMap.get(maxTypes[k]); + String inputType = getInputType(maxTypes[k]); + Tuple[] tuples = inputMapForAccumulate.get(inputType); + for (Tuple tup : tuples) + max.accumulate(tup); + Object output = max.getValue(); + + String msg = "[Testing " + maxTypes[k] + " accumulate on input type: " + getInputType(maxTypes[k]) + " ( (output) " + + output + " == " + getExpected(maxTypes[k]) + " (expected) )]"; + assertForInputType(inputType, msg, getExpected(maxTypes[k]), output); + } + } + + @Test public void testMathFuncs() throws Exception { Random generator = new Random(); generator.setSeed(System.currentTimeMillis()); @@ -1939,7 +1923,7 @@ public class TestBuiltin { t3.set(0, null); t3.set(1, "^\\/search\\/iy\\/(.*?)\\/.*"); t3.set(2, 2); - + Tuple t4 = tupleFactory.newTuple(3); t4.set(0,"this is a match"); t4.set(1, "this is a (.+?)"); @@ -2185,7 +2169,7 @@ public class TestBuiltin { } assertTrue("null in tobag result", s.contains(null)); } - + @Test public void testTOBAGSupportsTuplesInInput() throws IOException { String[][] expected = { @@ -2401,7 +2385,7 @@ public class TestBuiltin { assertTrue(msg, res.equals(exp)); } - + /** * End-to-end testing of the CONCAT() builtin function for vararg parameters * @throws Exception @@ -2412,17 +2396,17 @@ public class TestBuiltin { Util.createLocalInputFile(input, new String[]{"dummy"}); PigServer pigServer = new PigServer(Util.getLocalTestMode()); pigServer.registerQuery("A = LOAD '"+input+"' as (x:chararray);"); - + pigServer.registerQuery("B = foreach A generate CONCAT('a', CONCAT('b',CONCAT('c','d')));"); Iterator<Tuple> its = pigServer.openIterator("B"); Tuple t = its.next(); assertEquals("abcd",t.get(0)); - + pigServer.registerQuery("B = foreach A generate CONCAT('a', 'b', 'c', 'd');"); its = pigServer.openIterator("B"); t = its.next(); assertEquals("abcd",t.get(0)); - + pigServer.registerQuery("B = foreach A generate CONCAT('a', CONCAT('b','c'), 'd');"); its = pigServer.openIterator("B"); t = its.next(); @@ -2787,11 +2771,11 @@ public class TestBuiltin { assertTrue(rt.get(0).equals("456")); rt = i.next(); assertTrue(rt.get(0).equals("789")); - + // Check when delim specified Tuple t4 = tf.newTuple(2); t4.set(0, "123|456|78\"9"); - t4.set(1, "|"); + t4.set(1, "|"); b = f.exec(t4); assertTrue(b.size()==3); i = b.iterator(); @@ -2804,7 +2788,7 @@ public class TestBuiltin { b = f.exec(t2); assertTrue(b==null); - + b = f.exec(t3); assertTrue(b==null); } @@ -2846,7 +2830,7 @@ public class TestBuiltin { result = d.exec(t); assertEquals(2, result.size()); } - + //see PIG-2331 @Test public void testURIwithCurlyBrace() throws Exception { @@ -2887,6 +2871,29 @@ public class TestBuiltin { return expectedMap.get(expectedFor); } + private void assertForInputType(String inputType, String assertMsg, Object expected, Object actual) { + if (inputType == "ByteArray") { + assertEquals(assertMsg, expected, actual); + } else if (inputType == "Long") { + assertEquals(assertMsg, expected, actual); + } else if (inputType == "Integer") { + assertEquals(assertMsg, expected, actual); + } else if (inputType == "Double") { + assertEquals(assertMsg, expected, actual); + } else if (inputType == "Float") { + assertEquals(assertMsg, expected, actual); + } else if (inputType == "BigDecimal") { + assertEquals(assertMsg, ((BigDecimal)expected).toPlainString(), ((BigDecimal)expected).toPlainString()); + } else if (inputType == "BigInteger") { + assertEquals(assertMsg, ((BigInteger)expected).toString(), ((BigInteger)actual).toString()); + } else if (inputType == "String") { + assertEquals(assertMsg, expected, actual); + } else if (inputType == "DateTime") { + // Compare millis so that we dont have to worry about TZ + assertEquals(assertMsg, ((DateTime)expected).getMillis(), ((DateTime)actual).getMillis()); + } + } + @Test public void testKeySet() throws Exception { Map<String, Object> m = new HashMap<String, Object>(); @@ -2933,6 +2940,41 @@ public class TestBuiltin { assertEquals(resultList.get(1), "hadoop"); } + /** + * Tests that VALUESET preserves the schema when the map's value type is primitive. + */ + @Test + public void testValueSetOutputSchemaPrimitiveType() throws FrontendException { + Schema inputSchema = new Schema(); + Schema charArraySchema = new Schema(new FieldSchema(null, DataType.CHARARRAY)); + FieldSchema mapSchema = new FieldSchema(null, charArraySchema, DataType.MAP); + inputSchema.add(mapSchema); + + Schema tupleSchema = new Schema(new FieldSchema(null, charArraySchema, DataType.TUPLE)); + Schema expectedSchema = new Schema(new FieldSchema(null, tupleSchema, DataType.BAG)); + + VALUESET vs = new VALUESET(); + assertEquals(expectedSchema, vs.outputSchema(inputSchema)); + } + + /** + * Tests that VALUESET preserves the schema when the map's value type is complex. + */ + @Test + public void testValueSetOutputSchemaComplexType() throws FrontendException { + Schema inputSchema = new Schema(); + Schema tupleSchema = Schema.generateNestedSchema(DataType.TUPLE, DataType.CHARARRAY); + Schema bagSchema = new Schema(new FieldSchema(null, tupleSchema, DataType.BAG)); + FieldSchema mapSchema = new FieldSchema(null, bagSchema, DataType.MAP); + inputSchema.add(mapSchema); + + Schema tupleOfBagSchema = new Schema(new FieldSchema(null, bagSchema, DataType.TUPLE)); + Schema expectedSchema = new Schema(new FieldSchema(null, tupleOfBagSchema, DataType.BAG)); + + VALUESET vs = new VALUESET(); + assertEquals(expectedSchema, vs.outputSchema(inputSchema)); + } + @SuppressWarnings("unchecked") @Test public void testValueList() throws Exception { @@ -2958,6 +3000,40 @@ public class TestBuiltin { assertEquals((String)resultList.get(2), "hadoop"); } + /** + * Tests that VALUELIST preserves the schema when the map's value type is primitive. + */ + @Test + public void testValueListOutputSchemaPrimitiveType() throws FrontendException { + Schema inputSchema = new Schema(); + Schema charArraySchema = new Schema(new FieldSchema(null, DataType.CHARARRAY)); + FieldSchema mapSchema = new FieldSchema(null, charArraySchema, DataType.MAP); + inputSchema.add(mapSchema); + + Schema tupleSchema = new Schema(new FieldSchema(null, charArraySchema, DataType.TUPLE)); + Schema expectedSchema = new Schema(new FieldSchema(null, tupleSchema, DataType.BAG)); + + VALUELIST vl = new VALUELIST(); + assertEquals(expectedSchema, vl.outputSchema(inputSchema)); + } + + /** + * Tests that VALUELIST preserves the schema when the map's value type is complex. + */ + @Test + public void testValueListOutputSchemaComplexType() throws FrontendException { + Schema inputSchema = new Schema(); + Schema tupleSchema = Schema.generateNestedSchema(DataType.TUPLE, DataType.CHARARRAY); + Schema bagSchema = new Schema(new FieldSchema(null, tupleSchema, DataType.BAG)); + FieldSchema mapSchema = new FieldSchema(null, bagSchema, DataType.MAP); + inputSchema.add(mapSchema); + + Schema tupleOfBagSchema = new Schema(new FieldSchema(null, bagSchema, DataType.TUPLE)); + Schema expectedSchema = new Schema(new FieldSchema(null, tupleOfBagSchema, DataType.BAG)); + + VALUELIST vl = new VALUELIST(); + assertEquals(expectedSchema, vl.outputSchema(inputSchema)); + } @SuppressWarnings("unchecked") @Test @@ -3015,12 +3091,12 @@ public class TestBuiltin { Long years = func1.exec(t); System.out.println("Years: " + years.toString()); Assert.assertEquals(years.longValue(), 7L); - + MonthsBetween func2 = new MonthsBetween(); Long months = func2.exec(t); System.out.println("Months: " + months.toString()); Assert.assertEquals(months.longValue(),84L); - + WeeksBetween func3 = new WeeksBetween(); Long weeks = func3.exec(t); System.out.println("Weeks: " + weeks.toString()); @@ -3058,7 +3134,7 @@ public class TestBuiltin { t1.set(0, ToDate.extractDateTime("2010-04-15T08:11:33.020Z")); Tuple t2 = TupleFactory.getInstance().newTuple(1); t2.set(0, ToDate.extractDateTime("2010-04-15T08:11:33.020+08:00")); - + GetYear func1 = new GetYear(); Integer year = func1.exec(t1); assertEquals(year.intValue(), 2010); @@ -3070,31 +3146,31 @@ public class TestBuiltin { assertEquals(month.intValue(), 4); month = func2.exec(t2); assertEquals(month.intValue(), 4); - + GetDay func3 = new GetDay(); Integer day = func3.exec(t1); assertEquals(day.intValue(), 15); day = func3.exec(t2); assertEquals(day.intValue(), 15); - + GetHour func4 = new GetHour(); Integer hour = func4.exec(t1); assertEquals(hour.intValue(), 8); hour = func4.exec(t2); assertEquals(hour.intValue(), 8); - + GetMinute func5 = new GetMinute(); Integer minute = func5.exec(t1); assertEquals(minute.intValue(), 11); minute = func5.exec(t2); assertEquals(minute.intValue(), 11); - + GetSecond func6 = new GetSecond(); Integer second = func6.exec(t1); assertEquals(second.intValue(), 33); second = func6.exec(t2); assertEquals(second.intValue(), 33); - + GetMilliSecond func7 = new GetMilliSecond(); Integer milli = func7.exec(t1); assertEquals(milli.intValue(), 20); @@ -3106,13 +3182,13 @@ public class TestBuiltin { assertEquals(weekyear.intValue(), 2010); weekyear = func8.exec(t2); assertEquals(weekyear.intValue(), 2010); - + GetWeek func9 = new GetWeek(); Integer week = func9.exec(t1); assertEquals(week.intValue(), 15); week = func9.exec(t2); assertEquals(week.intValue(), 15); - + // Null handling t1.set(0, null); assertEquals(func1.exec(t1), null); @@ -3124,7 +3200,7 @@ public class TestBuiltin { assertEquals(func7.exec(t1), null); assertEquals(func8.exec(t1), null); assertEquals(func9.exec(t1), null); - + } @Test @@ -3139,16 +3215,134 @@ public class TestBuiltin { pigServer.registerQuery("A = load '" + inputFileName + "' as (name);"); pigServer.registerQuery("B = foreach A generate name, UniqueID();"); Iterator<Tuple> iter = pigServer.openIterator("B"); - iter.next().get(1).equals("0-0"); - iter.next().get(1).equals("0-1"); - iter.next().get(1).equals("0-2"); - iter.next().get(1).equals("0-3"); - iter.next().get(1).equals("0-4"); - iter.next().get(1).equals("1-0"); - iter.next().get(1).equals("1-1"); - iter.next().get(1).equals("1-1"); - iter.next().get(1).equals("1-2"); - iter.next().get(1).equals("1-3"); - iter.next().get(1).equals("1-4"); + assertEquals(iter.next().get(1),"0-0"); + assertEquals(iter.next().get(1),"0-1"); + assertEquals(iter.next().get(1),"0-2"); + assertEquals(iter.next().get(1),"0-3"); + assertEquals(iter.next().get(1),"0-4"); + assertEquals(iter.next().get(1),"1-0"); + assertEquals(iter.next().get(1),"1-1"); + assertEquals(iter.next().get(1),"1-2"); + assertEquals(iter.next().get(1),"1-3"); + assertEquals(iter.next().get(1),"1-4"); + } + + @Test + public void testRANDOMWithJob() throws Exception { + Util.resetStateForExecModeSwitch(); + String inputFileName = "testRANDOM.txt"; + Util.createInputFile(cluster, inputFileName, new String[] + {"1\n2\n3\n4\n5\n1\n2\n3\n4\n5\n"}); + PigServer pigServer = new PigServer(cluster.getExecType(), cluster.getProperties()); + // running with two mappers + pigServer.getPigContext().getProperties().setProperty("mapred.max.split.size", "10"); + pigServer.getPigContext().getProperties().setProperty("pig.noSplitCombination", "true"); + pigServer.registerQuery("A = load '" + inputFileName + "' as (name);"); + pigServer.registerQuery("B = foreach A generate name, RANDOM();"); + Iterator<Tuple> iter = pigServer.openIterator("B"); + double [] mapper1 = new double[5]; + double [] mapper2 = new double[5]; + for( int i = 0; i < 5; i++ ){ + mapper1[i] = (Double) iter.next().get(1); + if( i != 0 ) { + // making sure it's not creating same value + assertNotEquals(mapper1[i-1], mapper1[i], 0.0001); + } + } + for( int i = 0; i < 5; i++ ){ + mapper2[i] = (Double) iter.next().get(1); + if( i != 0 ) { + // making sure it's not creating same value + assertNotEquals(mapper2[i-1], mapper2[i], 0.0001); + } + } + // making sure different mappers are creating different random values + for( int i = 0; i < 5; i++ ){ + assertNotEquals(mapper1[i], mapper2[i], 0.0001); + } + } + + + @Test + public void testRANDOM() throws Exception { + Configuration conf = new Configuration(); + PigMapReduce.sJobConfInternal.set(conf); + PigMapReduce.sJobConfInternal.get().set(MRConfiguration.JOB_ID,"job_1111_111"); + PigMapReduce.sJobConfInternal.get().set(PigConstants.TASK_INDEX, "0"); + + org.apache.pig.builtin.RANDOM.resetSeedUniquifier(); + org.apache.pig.builtin.RANDOM r = new org.apache.pig.builtin.RANDOM(); + double [] tmpresult = new double [5]; + + for( int i = 0; i < 5 ; i++ ) { + tmpresult[i] = r.exec(null).doubleValue(); + LOG.info("Return value of RANDOM(): " + tmpresult[i]); + if( i != 0 ) { + //making sure RANDOM isn't returning some fixed number + assertNotEquals(tmpresult[i-1], tmpresult[i], 0.0001); + } + } + + // with different task id, random should return different number + org.apache.pig.builtin.RANDOM.resetSeedUniquifier(); + PigMapReduce.sJobConfInternal.get().set(MRConfiguration.JOB_ID,"job_1111_111"); + PigMapReduce.sJobConfInternal.get().set(PigConstants.TASK_INDEX, "1"); + r = new org.apache.pig.builtin.RANDOM(); + for( int i = 0; i < 5 ; i++ ) { + assertNotEquals(tmpresult[i], r.exec(null).doubleValue(), 0.0001); + } + + // with different jobid, random should return completely different number + org.apache.pig.builtin.RANDOM.resetSeedUniquifier(); + PigMapReduce.sJobConfInternal.get().set(MRConfiguration.JOB_ID,"job_1111_112"); + PigMapReduce.sJobConfInternal.get().set(PigConstants.TASK_INDEX, "0"); + r = new org.apache.pig.builtin.RANDOM(); + for( int i = 0; i < 5 ; i++ ) { + assertNotEquals(tmpresult[i], r.exec(null).doubleValue(), 0.0001); + } + + // with same jobid and taskid, random should return exact same sequence + // of pseudo-random number + org.apache.pig.builtin.RANDOM.resetSeedUniquifier(); + PigMapReduce.sJobConfInternal.get().set(MRConfiguration.JOB_ID,"job_1111_111"); + PigMapReduce.sJobConfInternal.get().set(PigConstants.TASK_INDEX, "0"); + r = new org.apache.pig.builtin.RANDOM(); + for( int i = 0; i < 5 ; i++ ) { + assertEquals(tmpresult[i], r.exec(null).doubleValue(), 0.0001 ); + } + + // When initialized again, they should return a different random values + // even when jobid and taskid match. + // To cover the case when RANDOM is called more than once in the user's + // script. + // B = FOREACH A generate RANDOM(), RANDOM(); + r = new org.apache.pig.builtin.RANDOM(); + for( int i = 0; i < 5 ; i++ ) { + assertNotEquals(tmpresult[i], r.exec(null).doubleValue(), 0.0001 ); + } + } + + @Test + public void testToMapSchema() throws Exception { + PigServer pigServer = new PigServer(Util.getLocalTestMode(), new Properties()); + pigServer.registerQuery("A = load '1.txt' as (a0:chararray, a1:int, a2:double, a3);"); + pigServer.registerQuery("B = foreach A generate [a0,a1];"); + Schema s = pigServer.dumpSchema("B"); + Assert.assertEquals(s.toString(), "{map[int]}"); + pigServer.registerQuery("B = foreach A generate [a0,a1,a0,a2];"); + s = pigServer.dumpSchema("B"); + Assert.assertEquals(s.toString(), "{map[]}"); + pigServer.registerQuery("B = foreach A generate [a0,a3];"); + s = pigServer.dumpSchema("B"); + Assert.assertEquals(s.toString(), "{map[]}"); + pigServer.registerQuery("A = load '1.txt' as (a:{(a0:chararray, a1:int)});"); + pigServer.registerQuery("B = foreach A generate TOMAP(a);"); + s = pigServer.dumpSchema("B"); + Assert.assertEquals(s.toString(), "{map[int]}"); + pigServer.registerQuery("A = load '1.txt' as (a:{(a0, a1, a2, a3:int)});"); + pigServer.registerQuery("B = foreach A generate TOMAP(a);"); + s = pigServer.dumpSchema("B"); + Assert.assertEquals(s.toString(), "{map[]}"); + } }
Modified: pig/branches/spark/test/org/apache/pig/test/TestCubeOperator.java URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/test/TestCubeOperator.java?rev=1733627&r1=1733626&r2=1733627&view=diff ============================================================================== --- pig/branches/spark/test/org/apache/pig/test/TestCubeOperator.java (original) +++ pig/branches/spark/test/org/apache/pig/test/TestCubeOperator.java Fri Mar 4 18:17:39 2016 @@ -143,34 +143,6 @@ public class TestCubeOperator { } @Test - public void testRollupHIIBasic() throws IOException { - // basic correctness test - String query = "a = load 'input' USING mock.Storage() as (x:chararray,y:chararray,z:long);" - + "b = cube a by rollup(x,y) pivot 1;" - + "c = foreach b generate flatten(group) as (type,location), COUNT_STAR(cube) as count, SUM(cube.z) as total;" - + "store c into 'output' using mock.Storage();"; - Util.registerMultiLineQuery(pigServer, query); - - Set<Tuple> expected = ImmutableSet.of( - tf.newTuple(Lists.newArrayList("cat", "miami", (long) 1, (long) 18)), - tf.newTuple(Lists.newArrayList("cat", "naples", (long) 1, (long) 9)), - tf.newTuple(Lists.newArrayList("cat", null, (long) 2, (long) 27)), - tf.newTuple(Lists.newArrayList("dog", "miami", (long) 1, (long) 12)), - tf.newTuple(Lists.newArrayList("dog", "tampa", (long) 1, (long) 14)), - tf.newTuple(Lists.newArrayList("dog", "naples", (long) 1, (long) 5)), - tf.newTuple(Lists.newArrayList("dog", null, (long) 3, (long) 31)), - tf.newTuple(Lists.newArrayList("turtle", "tampa", (long) 1, (long) 4)), - tf.newTuple(Lists.newArrayList("turtle", "naples", (long) 1, (long) 1)), - tf.newTuple(Lists.newArrayList("turtle", null, (long) 2, (long) 5)), - tf.newTuple(Lists.newArrayList(null, null, (long) 7, (long) 63))); - - List<Tuple> out = data.get("output"); - for (Tuple tup : out) { - assertTrue(expected + " contains " + tup, expected.contains(tup)); - } - } - - @Test public void testCubeAndRollup() throws IOException { // basic correctness test String query = "a = load 'input2' USING mock.Storage() as (v:chararray,w:chararray,x:chararray,y:chararray,z:long);" @@ -203,38 +175,6 @@ public class TestCubeOperator { } @Test - public void testCubeAndRollupHII() throws IOException { - // basic correctness test - String query = "a = load 'input2' USING mock.Storage() as (v:chararray,w:chararray,x:chararray,y:chararray,z:long);" - + "b = cube a by cube(v,w), rollup(x,y) pivot 1;" - + "c = foreach b generate flatten(group) as (type,location,color,category), COUNT_STAR(cube) as count, SUM(cube.z) as total;" - + "store c into 'output' using mock.Storage();"; - Util.registerMultiLineQuery(pigServer, query); - - Set<Tuple> expected = ImmutableSet - .of(tf.newTuple(Lists.newArrayList("dog", "miami", "white", "pet", (long) 1, - (long) 5)), tf.newTuple(Lists.newArrayList("dog", null, "white", "pet", - (long) 1, (long) 5)), tf.newTuple(Lists.newArrayList(null, "miami", - "white", "pet", (long) 1, (long) 5)), tf.newTuple(Lists.newArrayList(null, - null, "white", "pet", (long) 1, (long) 5)), tf.newTuple(Lists.newArrayList( - "dog", "miami", "white", null, (long) 1, (long) 5)), tf.newTuple(Lists - .newArrayList("dog", null, "white", null, (long) 1, (long) 5)), tf - .newTuple(Lists.newArrayList(null, "miami", "white", null, (long) 1, - (long) 5)), tf.newTuple(Lists.newArrayList(null, null, "white", - null, (long) 1, (long) 5)), tf.newTuple(Lists.newArrayList("dog", "miami", - null, null, (long) 1, (long) 5)), tf.newTuple(Lists.newArrayList("dog", - null, null, null, (long) 1, (long) 5)), tf.newTuple(Lists.newArrayList( - null, "miami", null, null, (long) 1, (long) 5)), tf.newTuple(Lists - .newArrayList(null, null, null, null, (long) 1, (long) 5))); - - List<Tuple> out = data.get("output"); - for (Tuple tup : out) { - assertTrue(expected + " contains " + tup, expected.contains(tup)); - } - - } - - @Test public void testCubeMultipleIAliases() throws IOException { // test for input alias to cube being assigned multiple times String query = "a = load 'input' USING mock.Storage() as (x:chararray,y:chararray,z:long);" @@ -636,36 +576,6 @@ public class TestCubeOperator { } @Test - public void testRollupHIIAfterCogroup() throws IOException { - // test for cubing on co-grouped relation - String query = "a = load 'input1' USING mock.Storage() as (a1:chararray,b1,c1,d1); " - + "b = load 'input' USING mock.Storage() as (a2,b2,c2:long,d2:chararray);" - + "c = cogroup a by a1, b by d2;" - + "d = foreach c generate flatten(a), flatten(b);" - + "e = cube d by rollup(a2,b2) pivot 1;" - + "f = foreach e generate flatten(group), COUNT(cube) as count, SUM(cube.c2) as total;" - + "store f into 'output' using mock.Storage();"; - - Util.registerMultiLineQuery(pigServer, query); - - Set<Tuple> expected = ImmutableSet.of( - tf.newTuple(Lists.newArrayList("cat", "miami", (long) 1, (long) 18)), - tf.newTuple(Lists.newArrayList("cat", null, (long) 1, (long) 18)), - tf.newTuple(Lists.newArrayList("dog", "miami", (long) 1, (long) 12)), - tf.newTuple(Lists.newArrayList("dog", "tampa", (long) 1, (long) 14)), - tf.newTuple(Lists.newArrayList("dog", null, (long) 2, (long) 26)), - tf.newTuple(Lists.newArrayList("turtle", "tampa", (long) 1, (long) 4)), - tf.newTuple(Lists.newArrayList("turtle", "naples", (long) 1, (long) 1)), - tf.newTuple(Lists.newArrayList("turtle", null, (long) 2, (long) 5)), - tf.newTuple(Lists.newArrayList(null, null, (long) 5, (long) 49))); - - List<Tuple> out = data.get("output"); - for (Tuple tup : out) { - assertTrue(expected + " contains " + tup, expected.contains(tup)); - } - } - - @Test public void testExplainCube() throws IOException { // test for explain String query = "a = load 'input' USING mock.Storage() as (a1:chararray,b1:chararray,c1:long); " @@ -686,19 +596,6 @@ public class TestCubeOperator { Util.registerMultiLineQuery(pigServer, query); ByteArrayOutputStream baos = new ByteArrayOutputStream(); - PrintStream ps = new PrintStream(baos); - pigServer.explain("b", ps); - assertTrue(baos.toString().contains("RollupDimensions")); - } - - @Test - public void testExplainRollupHII() throws IOException { - // test for explain - String query = "a = load 'input' USING mock.Storage() as (a1:chararray,b1:chararray,c1:long); " - + "b = cube a by rollup(a1,b1) pivot 1;"; - - Util.registerMultiLineQuery(pigServer, query); - ByteArrayOutputStream baos = new ByteArrayOutputStream(); PrintStream ps = new PrintStream(baos); pigServer.explain("b", ps); assertTrue(baos.toString().contains("RollupDimensions")); Modified: pig/branches/spark/test/org/apache/pig/test/TestEvalPipeline.java URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/test/TestEvalPipeline.java?rev=1733627&r1=1733626&r2=1733627&view=diff ============================================================================== --- pig/branches/spark/test/org/apache/pig/test/TestEvalPipeline.java (original) +++ pig/branches/spark/test/org/apache/pig/test/TestEvalPipeline.java Fri Mar 4 18:17:39 2016 @@ -250,6 +250,7 @@ public class TestEvalPipeline { return sb.toString(); } + @Override public Schema outputSchema(Schema input) { try { Schema stringSchema = new Schema(new Schema.FieldSchema(null, DataType.CHARARRAY)); @@ -265,6 +266,8 @@ public class TestEvalPipeline { @Override public Map<String, Object> exec(Tuple input) throws IOException { + super.reporter.progress(); + TupleFactory tupleFactory = TupleFactory.getInstance(); ArrayList<Object> objList = new ArrayList<Object>(); objList.add(new Integer(1)); @@ -294,6 +297,7 @@ public class TestEvalPipeline { return myMap; } + @Override public Schema outputSchema(Schema input) { return new Schema(new Schema.FieldSchema(null, DataType.MAP)); } Modified: pig/branches/spark/test/org/apache/pig/test/TestEvalPipeline2.java URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/test/TestEvalPipeline2.java?rev=1733627&r1=1733626&r2=1733627&view=diff ============================================================================== --- pig/branches/spark/test/org/apache/pig/test/TestEvalPipeline2.java (original) +++ pig/branches/spark/test/org/apache/pig/test/TestEvalPipeline2.java Fri Mar 4 18:17:39 2016 @@ -21,6 +21,8 @@ import java.io.File; import java.io.FileOutputStream; import java.io.IOException; import java.io.PrintStream; +import java.math.BigDecimal; +import java.math.BigInteger; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; @@ -58,7 +60,6 @@ import org.apache.pig.impl.util.ObjectSe import org.apache.pig.test.utils.Identity; import org.junit.AfterClass; import org.junit.Assert; -import org.junit.Assume; import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; @@ -173,9 +174,9 @@ public class TestEvalPipeline2 { // if the conversion happens when minimum conditions for conversion // such as expected number of bytes are met. String[] input = { - "asdf\t12\t1.1\t231\t234", - "sa\t1231\t123.4\t12345678\t1234.567", - "asdff\t1232123\t1.45345\t123456789\t123456789.9" + "asdf\t12\t1.1\t231\t234\t3024123\t3.2492", + "sa\t1231\t123.4\t12345678\t1234.567\t5081123453\t9.181817", + "asdff\t1232123\t1.45345\t123456789\t123456789.9\t1234567\t1.234567" }; Util.createInputFile(cluster, "table_bs_ac", input); @@ -187,7 +188,7 @@ public class TestEvalPipeline2 { pigServer.store("a", output, BinStorage.class.getName()); pigServer.registerQuery("b = load '" + output + "' using BinStorage('Utf8StorageConverter') " - + "as (name: int, age: int, gpa: float, lage: long, dgpa: double);"); + + "as (name: int, age: int, gpa: float, lage: long, dgpa: double, bi:biginteger, bd:bigdecimal);"); Iterator<Tuple> it = pigServer.openIterator("b"); @@ -206,6 +207,8 @@ public class TestEvalPipeline2 { Assert.assertTrue((Float)tup.get(2) == 1.1F); Assert.assertTrue((Long)tup.get(3) == 231L); Assert.assertTrue((Double)tup.get(4) == 234.0); + Assert.assertEquals((BigInteger)tup.get(5), new BigInteger("3024123")); + Assert.assertEquals((BigDecimal)tup.get(6), new BigDecimal("3.2492")); //tuple 2 tup = it.next(); @@ -214,6 +217,8 @@ public class TestEvalPipeline2 { Assert.assertTrue((Float)tup.get(2) == 123.4F); Assert.assertTrue((Long)tup.get(3) == 12345678L); Assert.assertTrue((Double)tup.get(4) == 1234.567); + Assert.assertEquals((BigInteger)tup.get(5), new BigInteger("5081123453")); + Assert.assertEquals((BigDecimal)tup.get(6), new BigDecimal("9.181817")); //tuple 3 tup = it.next(); @@ -222,6 +227,8 @@ public class TestEvalPipeline2 { Assert.assertTrue((Float)tup.get(2) == 1.45345F); Assert.assertTrue((Long)tup.get(3) == 123456789L); Assert.assertTrue((Double)tup.get(4) == 1.234567899E8); + Assert.assertEquals((BigInteger)tup.get(5), new BigInteger("1234567")); + Assert.assertEquals((BigDecimal)tup.get(6), new BigDecimal("1.234567")); Util.deleteFile(cluster, "table"); } @@ -823,7 +830,7 @@ public class TestEvalPipeline2 { pigServer.openIterator("c"); } catch (Exception e) { PigException pe = LogUtils.getPigException(e); - Util.checkStrContainsSubStr(pe.getMessage(), "Incompatable schema"); + Util.checkStrContainsSubStr(pe.getMessage(), "Incompatible schema"); return; } Assert.fail(); @@ -1428,7 +1435,6 @@ public class TestEvalPipeline2 { // See PIG-1826 @Test public void testNonStandardData() throws Exception{ - Assume.assumeTrue("Skip this test for TEZ. See PIG-3994", Util.isMapredExecType(cluster.getExecType())); String[] input1 = { "0", }; @@ -1441,7 +1447,10 @@ public class TestEvalPipeline2 { pigServer.openIterator("b"); Assert.fail(); } catch (Exception e) { - String message = e.getCause().getCause().getMessage(); + // Tez does not construct exceptions from stacktrace as it will have multiple ones. + // So e.getCause().getCause() will be null + Throwable cause = e.getCause().getCause() == null ? e.getCause() : e.getCause().getCause(); + String message = cause.getMessage(); Assert.assertTrue(message.contains(ArrayList.class.getName())); } } @@ -1449,7 +1458,6 @@ public class TestEvalPipeline2 { // See PIG-1826 @Test public void testNonStandardDataWithoutFetch() throws Exception{ - Assume.assumeTrue("Skip this test for TEZ. See PIG-3994", Util.isMapredExecType(cluster.getExecType())); Properties props = pigServer.getPigContext().getProperties(); props.setProperty(PigConfiguration.PIG_OPT_FETCH, "false"); String[] input1 = { @@ -1690,4 +1698,26 @@ public class TestEvalPipeline2 { pigServer.getPigContext().getProperties().remove("pig.exec.reducers.bytes.per.reducer"); } } + + // see PIG-4392 + @Test + public void testRankWithEmptyReduce() throws Exception { + Util.createInputFile(cluster, "table_testRankWithEmptyReduce", new String[]{"1\t2\t3", "4\t5\t6", "7\t8\t9"}); + pigServer.setDefaultParallel(4); + + pigServer.registerQuery("d = load 'table_testRankWithEmptyReduce' as (a:int, b:int, c:int);"); + pigServer.registerQuery("e = rank d by a parallel 4;"); + + Iterator<Tuple> iter = pigServer.openIterator("e"); + + Collection<String> results = new HashSet<String>(); + results.add("(1,1,2,3)"); + results.add("(2,4,5,6)"); + results.add("(3,7,8,9)"); + + Assert.assertTrue(results.contains(iter.next().toString())); + Assert.assertTrue(results.contains(iter.next().toString())); + Assert.assertTrue(results.contains(iter.next().toString())); + Assert.assertFalse(iter.hasNext()); + } } Modified: pig/branches/spark/test/org/apache/pig/test/TestFRJoin.java URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/test/TestFRJoin.java?rev=1733627&r1=1733626&r2=1733627&view=diff ============================================================================== --- pig/branches/spark/test/org/apache/pig/test/TestFRJoin.java (original) +++ pig/branches/spark/test/org/apache/pig/test/TestFRJoin.java Fri Mar 4 18:17:39 2016 @@ -46,9 +46,8 @@ import org.apache.pig.impl.io.FileSpec; import org.apache.pig.impl.logicalLayer.schema.Schema; import org.apache.pig.impl.plan.OperatorKey; import org.apache.pig.test.utils.TestHelper; -import org.junit.After; import org.junit.AfterClass; -import org.junit.Before; +import org.junit.BeforeClass; import org.junit.Test; public class TestFRJoin { @@ -61,8 +60,8 @@ public class TestFRJoin { pigServer = new PigServer(cluster.getExecType(), cluster.getProperties()); } - @Before - public void setUp() throws Exception { + @BeforeClass + public static void oneTimeSetup() throws Exception { int LOOP_SIZE = 2; String[] input = new String[2 * LOOP_SIZE]; int k = 0; @@ -85,13 +84,9 @@ public class TestFRJoin { @AfterClass public static void oneTimeTearDown() throws Exception { - cluster.shutDown(); - } - - @After - public void tearDown() throws Exception { Util.deleteFile(cluster, INPUT_FILE); Util.deleteFile(cluster, INPUT_FILE2); + cluster.shutDown(); } public static class FRJoin extends EvalFunc<DataBag> { @@ -442,7 +437,7 @@ public class TestFRJoin { Map<String, Tuple> hashJoin = new HashMap<String, Tuple>(); { pigServer.registerQuery("C = join A by $0 left, B by $0 using 'replicated';"); - pigServer.registerQuery("D = join A by $1 left, B by $1 using 'replicated';"); + pigServer.registerQuery("D = join A by $1 left, B by $1 using 'repl';"); pigServer.registerQuery("E = union C,D;"); Iterator<Tuple> iter = pigServer.openIterator("E"); @@ -475,14 +470,14 @@ public class TestFRJoin { public void testFRJoinOut9() throws IOException { pigServer.registerQuery("A = LOAD '" + INPUT_FILE + "' as (x:int,y:int);"); pigServer.registerQuery("B = LOAD '" + INPUT_FILE2 + "' as (x:int,y:int);"); + pigServer.registerQuery("C = UNION A, B;"); + pigServer.registerQuery("D = FILTER C BY x == 1;"); DataBag dbfrj = BagFactory.getInstance().newDefaultBag(), dbshj = BagFactory.getInstance() .newDefaultBag(); Map<String, Tuple> hashFRJoin = new HashMap<String, Tuple>(); Map<String, Tuple> hashJoin = new HashMap<String, Tuple>(); { - pigServer.registerQuery("C = join A by $0 left, B by $0 using 'repl';"); - pigServer.registerQuery("D = join A by $1 left, B by $1 using 'repl';"); - pigServer.registerQuery("E = union C,D;"); + pigServer.registerQuery("E = join C by $0 left, D by $0 using 'repl';"); Iterator<Tuple> iter = pigServer.openIterator("E"); while (iter.hasNext()) { @@ -494,9 +489,7 @@ public class TestFRJoin { } } { - pigServer.registerQuery("C = join A by $0 left, B by $0;"); - pigServer.registerQuery("D = join A by $1 left, B by $1;"); - pigServer.registerQuery("E = union C,D;"); + pigServer.registerQuery("E = join C by $0 left, D by $0;"); Iterator<Tuple> iter = pigServer.openIterator("E"); while (iter.hasNext()) { Tuple tuple = iter.next(); Modified: pig/branches/spark/test/org/apache/pig/test/TestGrunt.java URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/test/TestGrunt.java?rev=1733627&r1=1733626&r2=1733627&view=diff ============================================================================== --- pig/branches/spark/test/org/apache/pig/test/TestGrunt.java (original) +++ pig/branches/spark/test/org/apache/pig/test/TestGrunt.java Fri Mar 4 18:17:39 2016 @@ -1434,6 +1434,24 @@ public class TestGrunt { } @Test + public void testScriptWithSingleQuoteInsideCommentInGenerate() throws Throwable { + //the query has not store or dump, but in when -check is used + // all statements should be validated + String query = "a = load 'foo.pig' as (s1:chararray, s2:chararray);\n" + + "b = foreach a generate s1,\n" + + "--comment should be ignored even it has single quote ' in the line \n" + + " s2;\n"; + ArrayList<String> msgs = new ArrayList<String>(); // + validate(query, true, msgs.toArray(new String[0])); + query = "a = load 'foo.pig' as (s1:chararray, s2:chararray);\n" + + "b = foreach a generate s1,\n" + + "/* comment should be ignored even it has single quote ' in the line \n" + + "*/ \n" + + " s2;\n"; + validate(query, true, msgs.toArray(new String[0])); + } + + @Test public void testDebugOn() throws Throwable { String strCmd = "set debug on\n"; Modified: pig/branches/spark/test/org/apache/pig/test/TestHBaseStorage.java URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/test/TestHBaseStorage.java?rev=1733627&r1=1733626&r2=1733627&view=diff ============================================================================== --- pig/branches/spark/test/org/apache/pig/test/TestHBaseStorage.java (original) +++ pig/branches/spark/test/org/apache/pig/test/TestHBaseStorage.java Fri Mar 4 18:17:39 2016 @@ -38,7 +38,6 @@ import org.apache.hadoop.hbase.client.Re import org.apache.hadoop.hbase.client.ResultScanner; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.util.Bytes; -import org.apache.pig.ExecType; import org.apache.pig.PigServer; import org.apache.pig.backend.executionengine.ExecException; import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MRConfiguration; @@ -51,7 +50,6 @@ import org.apache.pig.data.TupleFactory; import org.junit.After; import org.junit.AfterClass; import org.junit.Assert; -import org.junit.Assume; import org.junit.Before; import org.junit.BeforeClass; import org.junit.Test; @@ -837,7 +835,6 @@ public class TestHBaseStorage { */ @Test public void testMergeJoin() throws IOException { - Assume.assumeTrue("Skip this test for TEZ. See PIG-4315", pig.getPigContext().getExecType().equals(ExecType.LOCAL)); prepareTable(TESTTABLE_1, true, DataFormat.HBaseBinary); prepareTable(TESTTABLE_2, true, DataFormat.HBaseBinary); pig.registerQuery("a = load 'hbase://" + TESTTABLE_1 + "' using " @@ -1080,7 +1077,7 @@ public class TestHBaseStorage { long col_a_ts = getColTimestamp(result, TESTCOLUMN_A); long col_b_ts = getColTimestamp(result, TESTCOLUMN_B); long col_c_ts = getColTimestamp(result, TESTCOLUMN_C); - + Assert.assertEquals(timestamp, col_a_ts); Assert.assertEquals(timestamp, col_b_ts); Assert.assertEquals(timestamp, col_c_ts); @@ -1127,7 +1124,7 @@ public class TestHBaseStorage { long col_a_ts = getColTimestamp(result, TESTCOLUMN_A); long col_b_ts = getColTimestamp(result, TESTCOLUMN_B); long col_c_ts = getColTimestamp(result, TESTCOLUMN_C); - + Assert.assertEquals(timestamp, col_a_ts); Assert.assertEquals(timestamp, col_b_ts); Assert.assertEquals(timestamp, col_c_ts); @@ -1174,7 +1171,7 @@ public class TestHBaseStorage { long col_a_ts = getColTimestamp(result, TESTCOLUMN_A); long col_b_ts = getColTimestamp(result, TESTCOLUMN_B); long col_c_ts = getColTimestamp(result, TESTCOLUMN_C); - + Assert.assertEquals(timestamp, col_a_ts); Assert.assertEquals(timestamp, col_b_ts); Assert.assertEquals(timestamp, col_c_ts); @@ -1219,7 +1216,7 @@ public class TestHBaseStorage { long col_a_ts = getColTimestamp(result, TESTCOLUMN_A); long col_b_ts = getColTimestamp(result, TESTCOLUMN_B); long col_c_ts = getColTimestamp(result, TESTCOLUMN_C); - + Assert.assertEquals("00".substring(v.length()) + v, rowKey); Assert.assertEquals(i, col_a); Assert.assertEquals(i + 0.0, col_b, 1e-6); @@ -1274,13 +1271,27 @@ public class TestHBaseStorage { * @throws ParseException */ @Test - public void testNoWAL() throws IOException, ParseException { + public void testNoWAL() throws Exception { HBaseStorage hbaseStorage = new HBaseStorage(TESTCOLUMN_A, "-noWAL"); Object key = "somekey"; byte type = DataType.CHARARRAY; - Assert.assertFalse(hbaseStorage.createPut(key, type).getWriteToWAL()); - Assert.assertFalse(hbaseStorage.createDelete(key, type, System.currentTimeMillis()).getWriteToWAL()); + Put put = hbaseStorage.createPut(key, type); + Delete delete = hbaseStorage.createDelete(key, type, System.currentTimeMillis()); + boolean hasDurabilityMethod = false; + try { + put.getClass().getMethod("getDurability"); + hasDurabilityMethod = true; + } catch (NoSuchMethodException e) { + } + if (hasDurabilityMethod) { // Hbase version 0.95+ + Object skipWal = Class.forName("org.apache.hadoop.hbase.client.Durability").getField("SKIP_WAL").get(put); + Assert.assertEquals(put.getClass().getMethod("getDurability").invoke(put), skipWal); + Assert.assertEquals(delete.getClass().getMethod("getDurability").invoke(delete), skipWal); + } else { + Assert.assertFalse(put.getWriteToWAL()); + Assert.assertFalse(delete.getWriteToWAL()); + } } /** @@ -1289,13 +1300,27 @@ public class TestHBaseStorage { * @throws ParseException */ @Test - public void testWIthWAL() throws IOException, ParseException { + public void testWIthWAL() throws Exception { HBaseStorage hbaseStorage = new HBaseStorage(TESTCOLUMN_A); Object key = "somekey"; byte type = DataType.CHARARRAY; - Assert.assertTrue(hbaseStorage.createPut(key, type).getWriteToWAL()); - Assert.assertTrue(hbaseStorage.createDelete(key, type, System.currentTimeMillis()).getWriteToWAL()); + Put put = hbaseStorage.createPut(key, type); + Delete delete = hbaseStorage.createDelete(key, type, System.currentTimeMillis()); + boolean hasDurabilityMethod = false; + try { + put.getClass().getMethod("getDurability"); + hasDurabilityMethod = true; + } catch (NoSuchMethodException e) { + } + if (hasDurabilityMethod) { // Hbase version 0.95+ + Object skipWal = Class.forName("org.apache.hadoop.hbase.client.Durability").getField("SKIP_WAL").get(put); + Assert.assertNotEquals(put.getClass().getMethod("getDurability").invoke(put), skipWal); + Assert.assertNotEquals(delete.getClass().getMethod("getDurability").invoke(delete), skipWal); + } else { + Assert.assertTrue(put.getWriteToWAL()); + Assert.assertTrue(delete.getWriteToWAL()); + } } /** @@ -1551,7 +1576,7 @@ public class TestHBaseStorage { */ private static long getColTimestamp(Result result, String colName) { byte[][] colArray = Bytes.toByteArrays(colName.split(":")); - return result.getColumnLatestCell(colArray[0], colArray[1]).getTimestamp(); + return result.getColumnLatest(colArray[0], colArray[1]).getTimestamp(); } } Modified: pig/branches/spark/test/org/apache/pig/test/TestHBaseStorageParams.java URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/test/TestHBaseStorageParams.java?rev=1733627&r1=1733626&r2=1733627&view=diff ============================================================================== --- pig/branches/spark/test/org/apache/pig/test/TestHBaseStorageParams.java (original) +++ pig/branches/spark/test/org/apache/pig/test/TestHBaseStorageParams.java Fri Mar 4 18:17:39 2016 @@ -19,6 +19,7 @@ package org.apache.pig.test; import org.apache.commons.cli.ParseException; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.client.Scan; import org.apache.pig.backend.hadoop.hbase.HBaseStorage; import org.apache.pig.impl.util.UDFContext; import org.junit.Assert; @@ -26,6 +27,7 @@ import org.junit.BeforeClass; import org.junit.Test; import java.io.IOException; +import java.lang.reflect.Field; import java.util.Properties; public class TestHBaseStorageParams { @@ -77,6 +79,23 @@ public class TestHBaseStorageParams { doColumnParseTest(storage, "foo:a", "foo:b ", " foo:c,d"); } + /** + * Assert that -maxResultsPerColumnFamily actually gets set on Scan + */ + @Test + public void testSetsMaxResultsPerColumnFamily() throws Exception { + Field scanField = HBaseStorage.class.getDeclaredField("scan"); + scanField.setAccessible(true); + + HBaseStorage storageNoMax = new HBaseStorage("", ""); + Scan scan = (Scan)scanField.get(storageNoMax); + Assert.assertEquals(-1, scan.getMaxResultsPerColumnFamily()); + + HBaseStorage storageWithMax = new HBaseStorage("", "-maxResultsPerColumnFamily 123"); + scan = (Scan)scanField.get(storageWithMax); + Assert.assertEquals(123, scan.getMaxResultsPerColumnFamily()); + } + private void doColumnParseTest(HBaseStorage storage, String... names) { Assert.assertEquals("Wrong column count", names.length, storage.getColumnInfoList().size()); Modified: pig/branches/spark/test/org/apache/pig/test/TestJobSubmission.java URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/test/TestJobSubmission.java?rev=1733627&r1=1733626&r2=1733627&view=diff ============================================================================== --- pig/branches/spark/test/org/apache/pig/test/TestJobSubmission.java (original) +++ pig/branches/spark/test/org/apache/pig/test/TestJobSubmission.java Fri Mar 4 18:17:39 2016 @@ -251,7 +251,7 @@ abstract public class TestJobSubmission jc=jcc.compile(mrPlan, "Test"); job = jc.getWaitingJobs().get(0); - Util.assertParallelValues(-1, -1, -1, 1, job.getJobConf()); + Util.assertParallelValues(-1, -1, 1, 1, job.getJobConf()); util.deleteTable(Bytes.toBytesBinary("test_table")); // In HBase 0.90.1 and above we can use util.shutdownMiniHBaseCluster() Modified: pig/branches/spark/test/org/apache/pig/test/TestLimitVariable.java URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/test/TestLimitVariable.java?rev=1733627&r1=1733626&r2=1733627&view=diff ============================================================================== --- pig/branches/spark/test/org/apache/pig/test/TestLimitVariable.java (original) +++ pig/branches/spark/test/org/apache/pig/test/TestLimitVariable.java Fri Mar 4 18:17:39 2016 @@ -25,6 +25,7 @@ import java.util.HashSet; import java.util.Iterator; import java.util.List; +import org.apache.pig.PigConfiguration; import org.apache.pig.PigServer; import org.apache.pig.backend.executionengine.ExecException; import org.apache.pig.data.Tuple; @@ -64,20 +65,24 @@ public class TestLimitVariable { @Test public void testLimitVariable1() throws IOException { + pigServer.getPigContext().getProperties().setProperty(PigConfiguration.PIG_EXEC_MAP_PARTAGG, "" + true); String query = - "a = load '" + inputFile.getName() + "';" + + "a = load '" + inputFile.getName() + "' as (f1:int, f2:int);" + "b = group a all;" + "c = foreach b generate COUNT(a) as sum;" + "d = order a by $0 DESC;" + - "e = limit d c.sum/2;" // return top half of the tuples + "e = limit d c.sum/2;" + // return top half of the tuples + "f = group e all;" + + "g = foreach f generate AVG(e.$0), SUM(e.$1);" ; Util.registerMultiLineQuery(pigServer, query); - Iterator<Tuple> it = pigServer.openIterator("e"); + Iterator<Tuple> it = pigServer.openIterator("g"); List<Tuple> expectedRes = Util.getTuplesFromConstantTupleStrings(new String[] { - "(6,15)", "(5,10)", "(4,11)" }); + "(5.0,36)"}); Util.checkQueryOutputs(it, expectedRes); + pigServer.getPigContext().getProperties().remove(PigConfiguration.PIG_EXEC_MAP_PARTAGG); } @Test Modified: pig/branches/spark/test/org/apache/pig/test/TestLoaderStorerShipCacheFiles.java URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/test/TestLoaderStorerShipCacheFiles.java?rev=1733627&r1=1733626&r2=1733627&view=diff ============================================================================== --- pig/branches/spark/test/org/apache/pig/test/TestLoaderStorerShipCacheFiles.java (original) +++ pig/branches/spark/test/org/apache/pig/test/TestLoaderStorerShipCacheFiles.java Fri Mar 4 18:17:39 2016 @@ -50,10 +50,9 @@ public abstract class TestLoaderStorerSh hadoopVersion = "23"; } String[] expectedJars = new String[] {"hive-common", "hive-exec", "hive-serde", - "hive-shims-0." + hadoopVersion, "hive-shims-common-0", "hive-shims-common-secure", - "kryo"}; + "hive-shims-0." + hadoopVersion, "hive-shims-common", "kryo"}; - checkPlan(pp, expectedJars, 7, pigServer.getPigContext()); + checkPlan(pp, expectedJars, 6, pigServer.getPigContext()); } @Test @@ -67,10 +66,9 @@ public abstract class TestLoaderStorerSh hadoopVersion = "23"; } String[] expectedJars = new String[] {"hive-common", "hive-exec", "hive-serde", - "hive-shims-0." + hadoopVersion, "hive-shims-common-0", "hive-shims-common-secure", - "kryo"}; + "hive-shims-0." + hadoopVersion, "hive-shims-common", "kryo"}; - checkPlan(pp, expectedJars, 7, pigServer.getPigContext()); + checkPlan(pp, expectedJars, 6, pigServer.getPigContext()); } @Test Modified: pig/branches/spark/test/org/apache/pig/test/TestLogicalPlanBuilder.java URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/test/TestLogicalPlanBuilder.java?rev=1733627&r1=1733626&r2=1733627&view=diff ============================================================================== --- pig/branches/spark/test/org/apache/pig/test/TestLogicalPlanBuilder.java (original) +++ pig/branches/spark/test/org/apache/pig/test/TestLogicalPlanBuilder.java Fri Mar 4 18:17:39 2016 @@ -1204,13 +1204,13 @@ public class TestLogicalPlanBuilder { try { buildPlan( query + "c = foreach b generate group as mygroup:{t: (myname, myage)}, COUNT(a) as mycount;"); } catch (AssertionFailedError e) { - Assert.assertTrue(e.getMessage().contains("Incompatable field schema")); + Assert.assertTrue(e.getMessage().contains("Incompatible field schema")); } try { buildPlan( query + "c = foreach b generate flatten(group) as (myname, myage, mygpa), COUNT(a) as mycount;"); } catch (AssertionFailedError e) { - Assert.assertTrue(e.getMessage().contains("Incompatable schema")); + Assert.assertTrue(e.getMessage().contains("Incompatible schema")); } } Modified: pig/branches/spark/test/org/apache/pig/test/TestMRCompiler.java URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/test/TestMRCompiler.java?rev=1733627&r1=1733626&r2=1733627&view=diff ============================================================================== --- pig/branches/spark/test/org/apache/pig/test/TestMRCompiler.java (original) +++ pig/branches/spark/test/org/apache/pig/test/TestMRCompiler.java Fri Mar 4 18:17:39 2016 @@ -39,6 +39,7 @@ import org.apache.pig.FuncSpec; import org.apache.pig.IndexableLoadFunc; import org.apache.pig.PigServer; import org.apache.pig.backend.executionengine.ExecException; +import org.apache.pig.backend.hadoop.executionengine.fetch.FetchOptimizer; import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.LimitAdjuster; import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MRCompiler; import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MRCompilerException; @@ -130,7 +131,8 @@ import org.junit.runner.RunWith; "testUDFInMergedCoGroup", "testUDFInMergedJoin", "testSchemaInStoreForDistinctLimit", - "testStorerLimit"}) + "testStorerLimit", + "testFetchOptimizerSideEffect"}) public class TestMRCompiler { static MiniCluster cluster; @@ -1280,5 +1282,27 @@ public class TestMRCompiler { POStore store = (POStore)firstMrOper.reducePlan.getLeaves().get(0); assertEquals(store.getStoreFunc().getClass().getName(), "org.apache.pig.impl.io.InterStorage"); } + + // See PIG-4538 + @Test + public void testFetchOptimizerSideEffect() throws Exception{ + String query = "in1 = LOAD 'data.txt' AS (ident:chararray);" + + "in2 = LOAD 'data.txt' AS (ident:chararray);" + + "in3 = LOAD 'data.txt';" + + "joined = JOIN in1 BY ident LEFT OUTER, in2 BY ident;" + + "store joined into 'output';"; + PhysicalPlan pp = Util.buildPp(pigServer, query); + MROperPlan mp = Util.buildMRPlan(pp, pc); + // isPlanFetchable should not bring side effect: + // set parentPlan for operators + FetchOptimizer.isPlanFetchable(pc, pp); + MapReduceOper op = mp.getLeaves().get(0); + PhysicalOperator store = op.reducePlan.getLeaves().get(0); + POForEach foreach = (POForEach)op.reducePlan.getPredecessors(store).get(0); + PhysicalOperator project = foreach.getInputPlans().get(0).getRoots().get(0); + Field parentPlan = PhysicalOperator.class.getDeclaredField("parentPlan"); + parentPlan.setAccessible(true); + assertTrue(parentPlan.get(project)==null); + } } Modified: pig/branches/spark/test/org/apache/pig/test/TestMergeJoin.java URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/test/TestMergeJoin.java?rev=1733627&r1=1733626&r2=1733627&view=diff ============================================================================== --- pig/branches/spark/test/org/apache/pig/test/TestMergeJoin.java (original) +++ pig/branches/spark/test/org/apache/pig/test/TestMergeJoin.java Fri Mar 4 18:17:39 2016 @@ -52,6 +52,7 @@ public class TestMergeJoin { private static final String INPUT_FILE = "testMergeJoinInput.txt"; private static final String INPUT_FILE2 = "testMergeJoinInput2.txt"; + private static final String INPUT_FILE3 = "testMergeJoinInput3.txt"; private PigServer pigServer; private static MiniGenericCluster cluster = MiniGenericCluster.buildCluster(); @@ -78,6 +79,19 @@ public class TestMergeJoin { Util.createInputFile(cluster, INPUT_FILE, input); Util.createInputFile(cluster, INPUT_FILE2, new String[]{"2"}); + + String[] input3 = new String[LOOP_SIZE]; + for (int i = 0; i<= LOOP_SIZE-1; i++) { + input3[i] = "(" + (i + 1) + ")\t + {"; + for(int j=1;j<=LOOP_SIZE;j++) { + input3[i] = input3[i] + "(" + j + ")"; + if (j!=LOOP_SIZE) { + input3[i] = input3[i] + ","; + } + } + input3[i] = input3[i] + "}"; + } + Util.createInputFile(cluster, INPUT_FILE3, input3); } @AfterClass @@ -91,6 +105,7 @@ public class TestMergeJoin { public void tearDown() throws Exception { Util.deleteFile(cluster, INPUT_FILE); Util.deleteFile(cluster, INPUT_FILE2); + Util.deleteFile(cluster, INPUT_FILE3); } @Test @@ -146,6 +161,61 @@ public class TestMergeJoin { } @Test + public void testMergeJoinWithReplicatedJoin() throws IOException{ + pigServer.registerQuery("A = LOAD '" + INPUT_FILE + "' as (f1:int,f2:int);"); + pigServer.registerQuery("B = LOAD '" + INPUT_FILE + "' as (f1:int,f2:int);"); + pigServer.registerQuery("C = LOAD '" + INPUT_FILE + "' as (f1:int,f2:int);"); + DataBag dbMergeJoin = BagFactory.getInstance().newDefaultBag(), dbshj = BagFactory.getInstance().newDefaultBag(); + { + pigServer.registerQuery("D = join A by f1, B by f1 using 'replicated';"); + pigServer.registerQuery("E = join D by A::f1, C by f1 using 'merge';"); + Iterator<Tuple> iter = pigServer.openIterator("E"); + + while(iter.hasNext()) { + dbMergeJoin.add(iter.next()); + } + } + { + pigServer.registerQuery("D = join A by f1, B by f1 using 'replicated';"); + pigServer.registerQuery("E = join D by A::f1, C by f1;"); + Iterator<Tuple> iter = pigServer.openIterator("E"); + + while(iter.hasNext()) { + dbshj.add(iter.next()); + } + } + Assert.assertEquals(dbshj.size(), dbMergeJoin.size()); + Assert.assertEquals(true, TestHelper.compareBags(dbMergeJoin, dbshj)); + } + + @Test + public void testMergeJoinWithForeachFlatten() throws IOException{ + pigServer.registerQuery("A = LOAD '" + INPUT_FILE + "' as (f1:int,f2:int);"); + pigServer.registerQuery("B = LOAD '" + INPUT_FILE3 + "' as (t:(f1:int), b:{(f1:int)});"); + pigServer.registerQuery("C = foreach B generate flatten(t) as f1:int, flatten(b);"); + pigServer.registerQuery("D = join C by f1, A by f1 using 'merge';"); + DataBag dbMergeJoin = BagFactory.getInstance().newDefaultBag(), dbshj = BagFactory.getInstance().newDefaultBag(); + { + pigServer.registerQuery("D = join C by f1, A by f1 using 'merge';"); + Iterator<Tuple> iter = pigServer.openIterator("D"); + + while(iter.hasNext()) { + dbMergeJoin.add(iter.next()); + } + } + { + pigServer.registerQuery("D = join C by f1, A by f1;"); + Iterator<Tuple> iter = pigServer.openIterator("D"); + + while(iter.hasNext()) { + dbshj.add(iter.next()); + } + } + Assert.assertEquals(dbshj.size(), dbMergeJoin.size()); + Assert.assertEquals(true, TestHelper.compareBags(dbMergeJoin, dbshj)); + } + + @Test public void testMergeJoinOnMultiFields() throws IOException{ pigServer.registerQuery("A = LOAD '" + INPUT_FILE + "';"); pigServer.registerQuery("B = LOAD '" + INPUT_FILE + "';"); @@ -380,6 +450,33 @@ public class TestMergeJoin { } @Test + public void testMergeJoinWithUDF() throws Exception{ + pigServer.registerQuery("A = LOAD '" + INPUT_FILE + "' as (x:int,y:double);"); + pigServer.registerQuery("B = LOAD '" + INPUT_FILE + "' as (x:int,y:double);"); + pigServer.registerQuery("A = FOREACH A GENERATE x, ABS(y) AS y;"); + + DataBag dbMergeJoin = BagFactory.getInstance().newDefaultBag(), dbshj = BagFactory.getInstance().newDefaultBag(); + { + pigServer.registerQuery("C = JOIN A BY x, B BY x USING 'merge';"); + Iterator<Tuple> iter = pigServer.openIterator("C"); + + while(iter.hasNext()) { + dbMergeJoin.add(iter.next()); + } + } + { + pigServer.registerQuery("C = JOIN A BY x, B BY x;"); + Iterator<Tuple> iter = pigServer.openIterator("C"); + + while(iter.hasNext()) { + dbshj.add(iter.next()); + } + } + Assert.assertEquals(dbMergeJoin.size(), dbshj.size()); + Assert.assertEquals(true, TestHelper.compareBags(dbMergeJoin, dbshj)); + } + + @Test public void testMergeJoin3Way() throws IOException{ try { pigServer.registerQuery("A = LOAD '" + INPUT_FILE + "' as (id, name, n);"); @@ -417,24 +514,6 @@ public class TestMergeJoin { } @Test - public void testMergeFailWithOrderUDF() throws Exception{ - String query = "A = LOAD '" + INPUT_FILE + "' as (id, name, n);\n" + - "B = LOAD '" + INPUT_FILE + "' as (id, name);\n" + - "A = FOREACH A GENERATE LOWER($0) as id;\n" + - "C = ORDER B by $0 parallel 5;\n" + - "D = join A by id, C by id using 'merge';\n" + - "store D into '/dev/null/1';"; - // verify that this fails parsing sanity checks. - try { - Util.buildPp(pigServer, query); - } catch (Throwable t) { - // expected to fail. - return; - } - Assert.fail("Allowed a Merge Join despite a UDF"); - } - - @Test public void testMergeJoinFailure2() throws IOException{ pigServer.registerQuery("A = LOAD '" + INPUT_FILE + "' as (id, name, n);"); pigServer.registerQuery("B = LOAD '" + INPUT_FILE + "' as (id, name);");
