Author: rohini
Date: Mon Mar 28 16:58:33 2016
New Revision: 1736904
URL: http://svn.apache.org/viewvc?rev=1736904&view=rev
Log:
PIG-4851: Null not padded when input has less fields than declared schema for
some loader (rohini)
Modified:
pig/trunk/CHANGES.txt
pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/TestCSVStorage.java
pig/trunk/src/org/apache/pig/builtin/PigStorage.java
pig/trunk/src/org/apache/pig/newplan/logical/rules/TypeCastInserter.java
pig/trunk/test/org/apache/pig/test/TestMergeForEachOptimization.java
pig/trunk/test/org/apache/pig/test/TestMultiQueryCompiler.java
pig/trunk/test/org/apache/pig/test/TestNewPlanFilterAboveForeach.java
pig/trunk/test/org/apache/pig/test/TestNewPlanFilterRule.java
pig/trunk/test/org/apache/pig/test/TestNewPlanLogicalOptimizer.java
pig/trunk/test/org/apache/pig/test/TestNewPlanPushDownForeachFlatten.java
pig/trunk/test/org/apache/pig/test/TestNewPlanPushUpFilter.java
pig/trunk/test/org/apache/pig/test/TestPigStorage.java
pig/trunk/test/org/apache/pig/test/data/DotFiles/explain1.dot
Modified: pig/trunk/CHANGES.txt
URL:
http://svn.apache.org/viewvc/pig/trunk/CHANGES.txt?rev=1736904&r1=1736903&r2=1736904&view=diff
==============================================================================
--- pig/trunk/CHANGES.txt (original)
+++ pig/trunk/CHANGES.txt Mon Mar 28 16:58:33 2016
@@ -105,6 +105,8 @@ PIG-4639: Add better parser for Apache H
BUG FIXES
+PIG-4851: Null not padded when input has less fields than declared schema for
some loader (rohini)
+
PIG-4850: Registered jars do not use submit replication (rdblue via cheolsoo)
PIG-4845: Parallel instantiation of classes in Tez cause tasks to fail (rohini)
Modified:
pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/TestCSVStorage.java
URL:
http://svn.apache.org/viewvc/pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/TestCSVStorage.java?rev=1736904&r1=1736903&r2=1736904&view=diff
==============================================================================
---
pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/TestCSVStorage.java
(original)
+++
pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/TestCSVStorage.java
Mon Mar 28 16:58:33 2016
@@ -29,17 +29,19 @@ import org.apache.pig.ExecType;
import org.apache.pig.PigServer;
import org.apache.pig.backend.executionengine.ExecException;
import
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MRConfiguration;
+import org.apache.pig.data.DataByteArray;
import org.apache.pig.data.Tuple;
import org.apache.pig.test.MiniCluster;
import org.apache.pig.test.Util;
+import org.junit.Assert;
import org.junit.Test;
public class TestCSVStorage {
protected static final Log LOG = LogFactory.getLog(TestCSVStorage.class);
-
+
private PigServer pigServer;
private MiniCluster cluster;
-
+
public TestCSVStorage() throws ExecException, IOException {
cluster = MiniCluster.buildCluster();
pigServer = new PigServer(ExecType.LOCAL, new Properties());
@@ -59,8 +61,8 @@ public class TestCSVStorage {
Iterator<Tuple> it = pigServer.openIterator("a");
assertEquals(Util.createTuple(new String[] {"foo", "bar", "baz"}),
it.next());
}
-
- @Test
+
+ @Test
public void testQuotedCommas() throws IOException {
String inputFileName = "TestCSVLoader-quotedcommas.txt";
Util.createLocalInputFile(inputFileName, new String[]
{"\"foo,bar,baz\"", "fee,foe,fum"});
@@ -71,11 +73,11 @@ public class TestCSVStorage {
assertEquals(Util.createTuple(new String[] {"foo,bar,baz", null,
null}), it.next());
assertEquals(Util.createTuple(new String[] {"fee", "foe", "fum"}),
it.next());
}
-
+
@Test
public void testQuotedQuotes() throws IOException {
String inputFileName = "TestCSVLoader-quotedquotes.txt";
- Util.createLocalInputFile(inputFileName,
+ Util.createLocalInputFile(inputFileName,
new String[] {"\"foo,\"\"bar\"\",baz\"", "\"\"\"\"\"\"\"\""});
String script = "a = load '" + inputFileName + "' using
org.apache.pig.piggybank.storage.CSVLoader() " +
" as (a:chararray); ";
@@ -84,5 +86,21 @@ public class TestCSVStorage {
assertEquals(Util.createTuple(new String[] {"foo,\"bar\",baz"}),
it.next());
assertEquals(Util.createTuple(new String[] {"\"\"\""}), it.next());
}
-
+
+ @Test
+ public void testNullPadding() throws IOException {
+ String inputFileName = "TestCSVLoader-nullpadding.txt";
+ Util.createLocalInputFile(inputFileName, new String[] { "a", "b,",
"c,d", ",e"});
+ String script = "a = load '" + inputFileName + "' using
org.apache.pig.piggybank.storage.CSVLoader() " +
+ " as (field1, field2); dump a;";
+ Util.registerMultiLineQuery(pigServer, script);
+ Iterator<Tuple> it = pigServer.openIterator("a");
+ assertEquals(Util.createTuple(new DataByteArray[] {new
DataByteArray("a"), null}), it.next());
+ assertEquals(Util.createTuple(new DataByteArray[] {new
DataByteArray("b"), null}), it.next());
+ assertEquals(Util.createTuple(new DataByteArray[] {new
DataByteArray("c"), new DataByteArray("d")}), it.next());
+ assertEquals(Util.createTuple(new DataByteArray[] {new
DataByteArray(""), new DataByteArray("e")}), it.next());
+ Assert.assertFalse(it.hasNext());
+ }
+
+
}
Modified: pig/trunk/src/org/apache/pig/builtin/PigStorage.java
URL:
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/builtin/PigStorage.java?rev=1736904&r1=1736903&r2=1736904&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/builtin/PigStorage.java (original)
+++ pig/trunk/src/org/apache/pig/builtin/PigStorage.java Mon Mar 28 16:58:33
2016
@@ -73,7 +73,6 @@ import org.apache.pig.bzip2r.Bzip2TextIn
import org.apache.pig.data.DataByteArray;
import org.apache.pig.data.Tuple;
import org.apache.pig.data.TupleFactory;
-import org.apache.pig.impl.PigContext;
import org.apache.pig.impl.logicalLayer.FrontendException;
import org.apache.pig.impl.util.CastUtils;
import org.apache.pig.impl.util.ObjectSerializer;
@@ -178,7 +177,7 @@ LoadPushDown, LoadMetadata, StoreMetadat
overwrite.setArgs(1);
overwrite.setArgName("overwrite");
validOptions.addOption(overwrite);
-
+
return validOptions;
}
@@ -225,7 +224,7 @@ LoadPushDown, LoadMetadata, StoreMetadat
if ("true".equalsIgnoreCase(value)) {
overwriteOutput = true;
}
- }
+ }
dontLoadSchema = configuredOptions.hasOption("noschema");
tagFile = configuredOptions.hasOption(TAG_SOURCE_FILE);
tagPath = configuredOptions.hasOption(TAG_SOURCE_PATH);
@@ -302,31 +301,18 @@ LoadPushDown, LoadMetadata, StoreMetadat
Properties p =
UDFContext.getUDFContext().getUDFProperties(this.getClass(),
new String[] {signature});
String serializedSchema = p.getProperty(signature+".schema");
- if (serializedSchema != null) {
- try {
- schema = new
ResourceSchema(Utils.getSchemaFromString(serializedSchema));
- } catch (ParserException e) {
- mLog.error("Unable to parse serialized schema " +
serializedSchema, e);
- // all bets are off - there's no guarantee that we'll
return
- // either the fields in the data or the fields in the
schema
- // the user specified (or required)
- }
+ if (serializedSchema == null) return tup;
+ try {
+ schema = new
ResourceSchema(Utils.getSchemaFromString(serializedSchema));
+ } catch (ParserException e) {
+ mLog.error("Unable to parse serialized schema " +
serializedSchema, e);
+ // all bets are off - there's no guarantee that we'll return
+ // either the fields in the data or the fields in the schema
+ // the user specified (or required)
}
}
- if (schema == null) {
- // if the number of required fields are less than or equal to
- // the number of fields in the data then we're OK as we've already
- // read only the required number of fields into the tuple. If
- // more fields are required than are in the data then we'll pad
- // with nulls:
- int numRequiredColumns = 0;
- for (int i = 0; mRequiredColumns != null && i <
mRequiredColumns.length; i++)
- if(mRequiredColumns[i])
- ++numRequiredColumns;
- for (int i = tup.size();i < numRequiredColumns; ++i)
- tup.append(null);
- } else {
+ if (schema != null) {
ResourceFieldSchema[] fieldSchemas = schema.getFields();
int tupleIdx = 0;
// If some fields have been projected out, the tuple
@@ -338,7 +324,7 @@ LoadPushDown, LoadMetadata, StoreMetadat
if (tupleIdx >= tup.size()) {
tup.append(null);
}
-
+
Object val = null;
if(tup.get(tupleIdx) != null){
byte[] bytes = ((DataByteArray)
tup.get(tupleIdx)).get();
Modified:
pig/trunk/src/org/apache/pig/newplan/logical/rules/TypeCastInserter.java
URL:
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/newplan/logical/rules/TypeCastInserter.java?rev=1736904&r1=1736903&r2=1736904&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/newplan/logical/rules/TypeCastInserter.java
(original)
+++ pig/trunk/src/org/apache/pig/newplan/logical/rules/TypeCastInserter.java
Mon Mar 28 16:58:33 2016
@@ -18,18 +18,13 @@
package org.apache.pig.newplan.logical.rules;
import java.util.ArrayList;
-import java.util.HashMap;
import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.TreeSet;
import org.apache.pig.FuncSpec;
import org.apache.pig.data.DataType;
import org.apache.pig.impl.logicalLayer.FrontendException;
import org.apache.pig.impl.streaming.StreamingCommand;
import org.apache.pig.impl.streaming.StreamingCommand.HandleSpec;
-import org.apache.pig.impl.util.Pair;
import org.apache.pig.newplan.Operator;
import org.apache.pig.newplan.OperatorPlan;
import org.apache.pig.newplan.logical.expression.CastExpression;
@@ -58,7 +53,7 @@ public abstract class TypeCastInserter e
public Transformer getNewTransformer() {
return new TypeCastInserterTransformer();
}
-
+
public class TypeCastInserterTransformer extends Transformer {
@Override
public boolean check(OperatorPlan matched) throws FrontendException {
@@ -77,18 +72,15 @@ public abstract class TypeCastInserter e
}
// Now that we've narrowed it down to an operation that *can* have
casts added,
- // (because the user specified some types which might not match
the data) let's
+ // (because the user specified some types which might not match
the data) let's
// see if they're actually needed:
LogicalSchema determinedSchema = determineSchema(op);
- if(atLeastOneCastNeeded(determinedSchema, s)) {
- return true;
- }
-
if(determinedSchema == null || determinedSchema.size() !=
s.size()) {
// we don't know what the data looks like, but the user has
specified
- // that they want a certain number of fields loaded. We'll use
a
- // projection (or pruning) to make sure the columns show up
(with NULL
- // values) or are truncated from the right hand side of the
input data.
+ // that they want a certain number of fields loaded.
+ return true;
+ }
+ if(atLeastOneCastNeeded(determinedSchema, s)) {
return true;
}
@@ -98,7 +90,7 @@ public abstract class TypeCastInserter e
private boolean atLeastOneCastNeeded(LogicalSchema determinedSchema,
LogicalSchema s) {
for (int i = 0; i < s.size(); i++) {
LogicalSchema.LogicalFieldSchema fs = s.getField(i);
- if (fs.type != DataType.BYTEARRAY && (determinedSchema == null
|| (!fs.isEqual(determinedSchema.getField(i))))) {
+ if (fs.type != DataType.BYTEARRAY &&
!fs.isEqual(determinedSchema.getField(i))) {
// we have to cast this field from the default BYTEARRAY
type to
// whatever the user specified in the 'AS' clause of the
LOAD
// statement (the fs.type).
@@ -120,64 +112,36 @@ public abstract class TypeCastInserter e
return;
}
- if(!atLeastOneCastNeeded(determinedSchema, s) && op instanceof
LOLoad) {
- // we're not going to insert any casts, but we might reduce or
increase
- // the number of columns coming out of the LOAD. If the loader
supports
- // it we'll use the 'requiredColumns' functionality rather
than bolting
- // on a FOREACH
- Set<Integer> required = new TreeSet<Integer>();
- for(int i = 0; i < s.size(); ++i) {
- // if we know the data source's schema, pick out the
columns we need,
- // otherwise take the first n
- int index = determinedSchema == null ? i :
determinedSchema.findField(s.getField(i).uid);
- if(index >= 0)
- required.add(index);
- }
-
- // pass the indices of the fields we need to a pruner, and
fire it off
- // so it configures the LOLoad (and the LoadFunc it contains)
- Map<LOLoad, Pair<Map<Integer, Set<String>>, Set<Integer>>>
requiredMap =
- new HashMap<LOLoad,
Pair<Map<Integer,Set<String>>,Set<Integer>>>(1);
- Pair<Map<Integer, Set<String>>, Set<Integer>> pair =
- new Pair<Map<Integer,Set<String>>, Set<Integer>>(null,
required);
- requiredMap.put((LOLoad) op, pair);
- new ColumnPruneVisitor(currentPlan, requiredMap ,
true).visit((LOLoad) op);
-
- // we only want to process this node once, so mark it:
- markCastNoNeed(op);
- return;
- }
-
// For every field, build a logical plan. If the field has a type
// other than byte array, then the plan will be cast(project).
Else
// it will just be project.
LogicalPlan innerPlan = new LogicalPlan();
-
+
LOForEach foreach = new LOForEach(currentPlan);
foreach.setInnerPlan(innerPlan);
foreach.setAlias(op.getAlias());
// Insert the foreach into the plan and patch up the plan.
Operator next = currentPlan.getSuccessors(op).get(0);
currentPlan.insertBetween(op, foreach, next);
-
+
List<LogicalExpressionPlan> exps = new
ArrayList<LogicalExpressionPlan>();
LOGenerate gen = new LOGenerate(innerPlan, exps, new
boolean[s.size()]);
innerPlan.add(gen);
for (int i = 0; i < s.size(); i++) {
LogicalSchema.LogicalFieldSchema fs = s.getField(i);
-
+
LOInnerLoad innerLoad = new LOInnerLoad(innerPlan, foreach, i);
- innerPlan.add(innerLoad);
+ innerPlan.add(innerLoad);
innerPlan.connect(innerLoad, gen);
-
+
LogicalExpressionPlan exp = new LogicalExpressionPlan();
-
+
ProjectExpression prj = new ProjectExpression(exp, i, -1, gen);
exp.add(prj);
-
+
if (fs.type != DataType.BYTEARRAY && (determinedSchema == null
|| (!fs.isEqual(determinedSchema.getField(i))))) {
- // Either no schema was determined by loader OR the type
+ // Either no schema was determined by loader OR the type
// from the "determinedSchema" is different
// from the type specified - so we need to cast
CastExpression cast = new CastExpression(exp, prj, new
LogicalSchema.LogicalFieldSchema(fs));
@@ -187,7 +151,7 @@ public abstract class TypeCastInserter e
loadFuncSpec =
((LOLoad)op).getFileSpec().getFuncSpec();
} else if (op instanceof LOStream) {
StreamingCommand command =
((LOStream)op).getStreamingCommand();
- HandleSpec streamOutputSpec = command.getOutputSpec();
+ HandleSpec streamOutputSpec = command.getOutputSpec();
loadFuncSpec = new
FuncSpec(streamOutputSpec.getSpec());
} else {
String msg = "TypeCastInserter invoked with an invalid
operator class name: " + innerPlan.getClass().getSimpleName();
@@ -199,7 +163,7 @@ public abstract class TypeCastInserter e
}
markCastInserted(op);
}
-
+
@Override
public OperatorPlan reportChanges() {
return currentPlan;
Modified: pig/trunk/test/org/apache/pig/test/TestMergeForEachOptimization.java
URL:
http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestMergeForEachOptimization.java?rev=1736904&r1=1736903&r2=1736904&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/test/TestMergeForEachOptimization.java
(original)
+++ pig/trunk/test/org/apache/pig/test/TestMergeForEachOptimization.java Mon
Mar 28 16:58:33 2016
@@ -55,37 +55,37 @@ public class TestMergeForEachOptimizatio
LogicalPlan plan = null;
PigContext pc = new PigContext( ExecType.LOCAL, new Properties() );
PigServer pigServer = null;
-
+
@Before
public void setup() throws ExecException {
pigServer = new PigServer( pc );
}
-
+
@After
public void tearDown() {
-
+
}
-
+
/**
* Basic test case. Two simple FOREACH statements can be merged to one.
- * @throws Exception
+ * @throws Exception
*/
- @Test
+ @Test
public void testSimple() throws Exception {
String query = "A = load 'file.txt' as (a, b, c);" +
"B = foreach A generate a+b, c-b;" +
"C = foreach B generate $0+5, $1;" +
- "store C into 'empty';";
+ "store C into 'empty';";
LogicalPlan newLogicalPlan = Util.buildLp(pigServer, query);
-
+
int forEachCount1 = getForEachOperatorCount( newLogicalPlan );
int outputExprCount1 = getOutputExprCount( newLogicalPlan );
LOForEach foreach1 = getForEachOperator( newLogicalPlan );
Assert.assertTrue( foreach1.getAlias().equals( "C" ) );
-
+
PlanOptimizer optimizer = new MyPlanOptimizer( newLogicalPlan, 3 );
optimizer.optimize();
-
+
int forEachCount2 = getForEachOperatorCount( newLogicalPlan );
Assert.assertEquals( 1, forEachCount1 - forEachCount2 );
int outputExprCount2 = getOutputExprCount( newLogicalPlan );
@@ -93,26 +93,26 @@ public class TestMergeForEachOptimizatio
LOForEach foreach2 = getForEachOperator( newLogicalPlan );
Assert.assertTrue( foreach2.getAlias().equals( "C" ) );
}
-
+
/**
* Test more complex case where the first for each in the script has inner
plan.
- * @throws Exception
+ * @throws Exception
*/
@Test
public void testComplex() throws Exception {
String query = "A = load 'file.txt' as (a:int, b,
c:bag{t:tuple(c0:int,c1:int)});" +
"B = foreach A { S = ORDER c BY $0; generate $0, COUNT(S), SIZE(S);
};" +
- "C = foreach B generate $2+5 as x, $0-$1/2 as y;" + "store C into
'empty';" ;
+ "C = foreach B generate $2+5 as x, $0-$1/2 as y;" + "store C into
'empty';" ;
LogicalPlan newLogicalPlan = Util.buildLp(pigServer, query);
-
+
int forEachCount1 = getForEachOperatorCount( newLogicalPlan );
int outputExprCount1 = getOutputExprCount( newLogicalPlan );
LOForEach foreach1 = getForEachOperator( newLogicalPlan );
Assert.assertTrue( foreach1.getAlias().equals( "C" ) );
-
+
PlanOptimizer optimizer = new MyPlanOptimizer( newLogicalPlan, 3 );
optimizer.optimize();
-
+
int forEachCount2 = getForEachOperatorCount( newLogicalPlan );
// The number of FOREACHes didn't change because one is genereated
because of type cast and
// one is reduced because of the merge.
@@ -125,10 +125,10 @@ public class TestMergeForEachOptimizatio
Assert.assertTrue(newSchema.getField(0).alias.equals("x"));
Assert.assertTrue(newSchema.getField(1).alias.equals("y"));
}
-
+
/**
* One output of first foreach was referred more than once in the second
foreach
- * @throws Exception
+ * @throws Exception
*/
@Test
public void testDuplicateInputs() throws Exception {
@@ -136,84 +136,84 @@ public class TestMergeForEachOptimizatio
"A1 = foreach A generate (int)a0 as a0, (double)a1 as a1;" +
"B = group A1 all;" +
"C = foreach B generate A1;" +
- "D = foreach C generate SUM(A1.a0), AVG(A1.a1);" + "store D into
'empty';" ;
+ "D = foreach C generate SUM(A1.a0), AVG(A1.a1);" + "store D into
'empty';" ;
LogicalPlan newLogicalPlan = Util.buildLp(pigServer, query);
-
+
Operator store = newLogicalPlan.getSinks().get(0);
int forEachCount1 = getForEachOperatorCount( newLogicalPlan );
LOForEach foreach1 =
(LOForEach)newLogicalPlan.getPredecessors(store).get(0);
Assert.assertTrue( foreach1.getAlias().equals( "D" ) );
-
+
PlanOptimizer optimizer = new MyPlanOptimizer( newLogicalPlan, 3 );
optimizer.optimize();
-
+
int forEachCount2 = getForEachOperatorCount( newLogicalPlan );
// The number of FOREACHes didn't change because one is genereated
because of type cast and
// one is reduced because of the merge.
Assert.assertEquals( 1, forEachCount1 - forEachCount2 );
-
+
LOForEach foreach2 =
(LOForEach)newLogicalPlan.getPredecessors(store).get(0);
Assert.assertTrue( foreach2.getAlias().equals( "D" ) );
}
-
+
/**
* Not all consecutive FOREACHes can be merged. In this case, the second
FOREACH statment
* has inner plan, which cannot be merged with one before it.
- * @throws Exception
+ * @throws Exception
*/
@Test
public void testNegative1() throws Exception {
String query = "A = LOAD 'file.txt' as (a, b, c,
d:bag{t:tuple(c0:int,c1:int)});" +
"B = FOREACH A GENERATE a+5 AS u, b-c/2 AS v, d AS w;" +
"C = FOREACH B { S = ORDER w BY $0; GENERATE $0 as x, COUNT(S) as y;
};" +
- "store C into 'empty';";
+ "store C into 'empty';";
LogicalPlan newLogicalPlan = Util.buildLp(pigServer, query);
-
+
int forEachCount1 = getForEachOperatorCount( newLogicalPlan );
-
+
PlanOptimizer optimizer = new MyPlanOptimizer( newLogicalPlan, 3 );
optimizer.optimize();
int forEachCount2 = getForEachOperatorCount( newLogicalPlan );
-
+
// Actually MergeForEach optimization is happening here. A new foreach
will be inserted after A because
- // of typ casting. The inserted one and the one in B can be merged due
to this optimization. However,
+ // of typ casting. The inserted one and the one in B can be merged due
to this optimization. However,
// the plan cannot be further optimized because C has inner plan.
Assert.assertEquals( forEachCount1, forEachCount2 );
}
-
+
/**
* MergeForEach Optimization is off if the first statement has a FLATTEN
operator.
- * @throws Exception
+ * @throws Exception
*/
@Test
public void testNegative2() throws Exception {
String query = "A = LOAD 'file.txt' as (a, b, c);" +
"B = FOREACH A GENERATE FLATTEN(a), b, c;" +
- "C = FOREACH B GENERATE $0, $1+$2;" + "store C into 'empty';" ;
+ "C = FOREACH B GENERATE $0, $1+$2;" + "store C into 'empty';" ;
LogicalPlan newLogicalPlan = Util.buildLp(pigServer, query);
-
+
int forEachCount1 = getForEachOperatorCount( newLogicalPlan );
-
+
PlanOptimizer optimizer = new MyPlanOptimizer( newLogicalPlan, 3 );
optimizer.optimize();
-
+
int forEachCount2 = getForEachOperatorCount( newLogicalPlan );
Assert.assertEquals( 2, forEachCount1 );
Assert.assertEquals( 2, forEachCount2 );
}
-
-
+
+
/**
* Ensure that join input order does not get reversed (PIG-1672)
- * @throws Exception
+ * @throws Exception
*/
- @Test
+ @Test
public void testJoinInputOrder() throws Exception {
String query = "l1 = load 'y' as (a);" +
"l2 = load 'z' as (a1,b1,c1,d1);" +
"f1 = foreach l2 generate a1, b1, c1, d1;" +
"f2 = foreach f1 generate a1, b1, c1;" +
- "j1 = join f2 by a1, l1 by a using 'replicated';" + "store j1 into
'empty';" ;
+ "j1 = join f2 by a1, l1 by a using 'replicated';" + "store j1 into
'empty';" ;
LogicalPlan newLogicalPlan = Util.buildLp(pigServer, query);
int forEachCount1 = getForEachOperatorCount( newLogicalPlan );
@@ -225,15 +225,15 @@ public class TestMergeForEachOptimizatio
}
LOForEach foreachL2 =
(LOForEach)newLogicalPlan.getSuccessors(l2).get(0);
foreachL2 = (LOForEach)newLogicalPlan.getSuccessors(foreachL2).get(0);
-
+
int outputExprCount1 =
((LOGenerate)foreachL2.getInnerPlan().getSinks().get(0)).getOutputPlans().size();
-
+
PlanOptimizer optimizer = new MyPlanOptimizer( newLogicalPlan, 3 );
optimizer.optimize();
-
+
int forEachCount2 = getForEachOperatorCount( newLogicalPlan );
- Assert.assertEquals( 1, forEachCount1 - forEachCount2 );
-
+ Assert.assertEquals( 0, forEachCount1 - forEachCount2 );
+
loads = newLogicalPlan.getSources();
l2 = null;
for (Operator load : loads) {
@@ -241,21 +241,21 @@ public class TestMergeForEachOptimizatio
l2 = load;
}
foreachL2 = (LOForEach)newLogicalPlan.getSuccessors(l2).get(0);
-
+
int outputExprCount2 =
((LOGenerate)foreachL2.getInnerPlan().getSinks().get(0)).getOutputPlans().size();
-
+
Assert.assertTrue( outputExprCount1 == outputExprCount2 );
Assert.assertTrue( foreachL2.getAlias().equals( "f2" ) );
-
+
LOJoin join = (LOJoin)getOperator(newLogicalPlan, LOJoin.class);
LogicalRelationalOperator leftInp =
(LogicalRelationalOperator)newLogicalPlan.getPredecessors(join).get(0);
- assertEquals("join child left", leftInp.getAlias(), "f2");
-
+ assertEquals("join child left", leftInp.getAlias(), "f2");
+
LogicalRelationalOperator rightInp =
(LogicalRelationalOperator)newLogicalPlan.getPredecessors(join).get(1);
- assertEquals("join child right", rightInp.getAlias(), "l1");
-
+ assertEquals("join child right", rightInp.getAlias(), "l1");
+
}
private int getForEachOperatorCount(LogicalPlan plan) {
@@ -268,7 +268,7 @@ public class TestMergeForEachOptimizatio
}
return count;
}
-
+
private int getOutputExprCount(LogicalPlan plan) throws IOException {
LOForEach foreach = getForEachOperator( plan );
LogicalPlan inner = foreach.getInnerPlan();
@@ -276,7 +276,7 @@ public class TestMergeForEachOptimizatio
LOGenerate gen = (LOGenerate)ops.get( 0 );
return gen.getOutputPlans().size();
}
-
+
private LOForEach getForEachOperator(LogicalPlan plan) throws IOException {
Iterator<Operator> ops = plan.getOperators();
while( ops.hasNext() ) {
@@ -290,7 +290,7 @@ public class TestMergeForEachOptimizatio
}
return null;
}
-
+
/**
* returns first operator that is an instance of given class c
* @param plan
@@ -303,41 +303,42 @@ public class TestMergeForEachOptimizatio
while( ops.hasNext() ) {
Operator op = ops.next();
if( op.getClass().equals(c)) {
- return op;
+ return op;
}
}
return null;
}
-
+
public class MyPlanOptimizer extends LogicalPlanOptimizer {
protected MyPlanOptimizer(OperatorPlan p, int iterations) {
super(p, iterations, new HashSet<String>());
}
-
- protected List<Set<Rule>> buildRuleSets() {
+
+ @Override
+ protected List<Set<Rule>> buildRuleSets() {
List<Set<Rule>> ls = new ArrayList<Set<Rule>>();
-
+
Set<Rule> s = new HashSet<Rule>();
// add split filter rule
Rule r = new LoadTypeCastInserter( "TypeCastInserter" );
s.add(r);
ls.add(s);
-
+
// Split Set
// This set of rules does splitting of operators only.
// It does not move operators
s = new HashSet<Rule>();
r = new AddForEach( "AddForEach" );
- s.add(r);
+ s.add(r);
ls.add(s);
-
+
s = new HashSet<Rule>();
r = new MergeForEach("MergeForEach");
- s.add(r);
+ s.add(r);
ls.add(s);
return ls;
}
- }
+ }
}
Modified: pig/trunk/test/org/apache/pig/test/TestMultiQueryCompiler.java
URL:
http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestMultiQueryCompiler.java?rev=1736904&r1=1736903&r2=1736904&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/test/TestMultiQueryCompiler.java (original)
+++ pig/trunk/test/org/apache/pig/test/TestMultiQueryCompiler.java Mon Mar 28
16:58:33 2016
@@ -575,7 +575,7 @@ public class TestMultiQueryCompiler {
LogicalPlan lp = checkLogicalPlan(2, 1, 7);
- PhysicalPlan pp = checkPhysicalPlan(lp, 2, 1, 11);
+ PhysicalPlan pp = checkPhysicalPlan(lp, 2, 1, 13);
checkMRPlan(pp, 1, 1, 2);
Modified: pig/trunk/test/org/apache/pig/test/TestNewPlanFilterAboveForeach.java
URL:
http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestNewPlanFilterAboveForeach.java?rev=1736904&r1=1736903&r2=1736904&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/test/TestNewPlanFilterAboveForeach.java
(original)
+++ pig/trunk/test/org/apache/pig/test/TestNewPlanFilterAboveForeach.java Mon
Mar 28 16:58:33 2016
@@ -318,6 +318,8 @@ public class TestNewPlanFilterAboveForea
Assert.assertTrue( filter instanceof LOFilter );
Operator fe = newLogicalPlan.getSuccessors( filter ).get( 0 );
Assert.assertTrue( fe instanceof LOForEach );
+ fe = newLogicalPlan.getSuccessors( fe ).get( 0 );
+ Assert.assertTrue( fe instanceof LOForEach );
Operator store = newLogicalPlan.getSuccessors( fe ).get( 0 );
Assert.assertTrue( store instanceof LOStore );
}
@@ -335,6 +337,8 @@ public class TestNewPlanFilterAboveForea
Assert.assertTrue( load instanceof LOLoad );
Operator fe = newLogicalPlan.getSuccessors( load ).get( 0 );
Assert.assertTrue( fe instanceof LOForEach );
+ fe = newLogicalPlan.getSuccessors( fe ).get( 0 );
+ Assert.assertTrue( fe instanceof LOForEach );
Operator filter = newLogicalPlan.getSuccessors( fe ).get( 0 );
Assert.assertTrue( filter instanceof LOFilter );
Operator store = newLogicalPlan.getSuccessors( filter ).get( 0 );
@@ -354,6 +358,8 @@ public class TestNewPlanFilterAboveForea
Assert.assertTrue( load instanceof LOLoad );
Operator fe = newLogicalPlan.getSuccessors( load ).get( 0 );
Assert.assertTrue( fe instanceof LOForEach );
+ fe = newLogicalPlan.getSuccessors( fe ).get( 0 );
+ Assert.assertTrue( fe instanceof LOForEach );
Operator filter = newLogicalPlan.getSuccessors( fe ).get( 0 );
Assert.assertTrue( filter instanceof LOFilter );
Operator store = newLogicalPlan.getSuccessors( filter ).get( 0 );
@@ -375,6 +381,8 @@ public class TestNewPlanFilterAboveForea
Assert.assertTrue( filter instanceof LOFilter );
Operator fe = newLogicalPlan.getSuccessors( filter ).get( 0 );
Assert.assertTrue( fe instanceof LOForEach );
+ fe = newLogicalPlan.getSuccessors( fe ).get( 0 );
+ Assert.assertTrue( fe instanceof LOForEach );
Operator store = newLogicalPlan.getSuccessors( fe ).get( 0 );
Assert.assertTrue( store instanceof LOStore );
}
@@ -395,6 +403,8 @@ public class TestNewPlanFilterAboveForea
Assert.assertTrue( filter instanceof LOFilter );
Operator fe = newLogicalPlan.getSuccessors( filter ).get( 0 );
Assert.assertTrue( fe instanceof LOForEach );
+ fe = newLogicalPlan.getSuccessors( fe ).get( 0 );
+ Assert.assertTrue( fe instanceof LOForEach );
Operator store = newLogicalPlan.getSuccessors( fe ).get( 0 );
Assert.assertTrue( store instanceof LOStore );
}
@@ -414,6 +424,8 @@ public class TestNewPlanFilterAboveForea
Assert.assertTrue( filter instanceof LOFilter );
Operator fe = newLogicalPlan.getSuccessors( filter ).get( 0 );
Assert.assertTrue( fe instanceof LOForEach );
+ fe = newLogicalPlan.getSuccessors( fe ).get( 0 );
+ Assert.assertTrue( fe instanceof LOForEach );
Operator store = newLogicalPlan.getSuccessors( fe ).get( 0 );
Assert.assertTrue( store instanceof LOStore );
}
@@ -433,6 +445,8 @@ public class TestNewPlanFilterAboveForea
Assert.assertTrue( filter instanceof LOFilter );
Operator fe = newLogicalPlan.getSuccessors( filter ).get( 0 );
Assert.assertTrue( fe instanceof LOForEach );
+ fe = newLogicalPlan.getSuccessors( fe ).get( 0 );
+ Assert.assertTrue( fe instanceof LOForEach );
Operator store = newLogicalPlan.getSuccessors( fe ).get( 0 );
Assert.assertTrue( store instanceof LOStore );
}
Modified: pig/trunk/test/org/apache/pig/test/TestNewPlanFilterRule.java
URL:
http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestNewPlanFilterRule.java?rev=1736904&r1=1736903&r2=1736904&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/test/TestNewPlanFilterRule.java (original)
+++ pig/trunk/test/org/apache/pig/test/TestNewPlanFilterRule.java Mon Mar 28
16:58:33 2016
@@ -488,7 +488,9 @@ public class TestNewPlanFilterRule {
Operator load = newLogicalPlan.getSources().get( 0 );
Assert.assertTrue( load instanceof LOLoad );
- Operator group = newLogicalPlan.getSuccessors( load ).get( 0 );
+ Operator fe = newLogicalPlan.getSuccessors( load ).get( 0 );
+ Assert.assertTrue( fe instanceof LOForEach );
+ Operator group = newLogicalPlan.getSuccessors( fe ).get( 0 );
Assert.assertTrue( group instanceof LOCogroup );
Operator filter = newLogicalPlan.getSuccessors( group ).get( 0 );
Assert.assertTrue( filter instanceof LOFilter );
Modified: pig/trunk/test/org/apache/pig/test/TestNewPlanLogicalOptimizer.java
URL:
http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestNewPlanLogicalOptimizer.java?rev=1736904&r1=1736903&r2=1736904&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/test/TestNewPlanLogicalOptimizer.java
(original)
+++ pig/trunk/test/org/apache/pig/test/TestNewPlanLogicalOptimizer.java Mon Mar
28 16:58:33 2016
@@ -21,6 +21,7 @@ import static org.apache.pig.newplan.log
import static org.junit.Assert.assertTrue;
import java.io.IOException;
+import java.util.HashSet;
import java.util.Properties;
import org.apache.hadoop.conf.Configuration;
@@ -39,6 +40,7 @@ import org.apache.pig.newplan.logical.ex
import org.apache.pig.newplan.logical.optimizer.LogicalPlanOptimizer;
import org.apache.pig.newplan.logical.optimizer.SchemaResetter;
import org.apache.pig.newplan.logical.relational.LOFilter;
+import org.apache.pig.newplan.logical.relational.LOForEach;
import org.apache.pig.newplan.logical.relational.LOJoin;
import org.apache.pig.newplan.logical.relational.LOJoin.JOINTYPE;
import org.apache.pig.newplan.logical.relational.LOLoad;
@@ -172,6 +174,11 @@ public class TestNewPlanLogicalOptimizer
expected.add(DA);
expected.connect(A, DA);
+ // A = foreach
+ LOForEach foreachA =
org.apache.pig.newplan.logical.Util.addForEachAfter(expected, DA, 0, new
HashSet<Integer>());
+ foreachA.setAlias("A");
+ foreachA.neverUseForRealSetSchema(aschema);
+
// B = load
LogicalSchema bschema = new LogicalSchema();
bschema.addField(new LogicalSchema.LogicalFieldSchema(
@@ -193,6 +200,11 @@ public class TestNewPlanLogicalOptimizer
expected.add(DB);
expected.connect(B, DB);
+ // B = foreach
+ LOForEach foreachB =
org.apache.pig.newplan.logical.Util.addForEachAfter(expected, DB, 0, new
HashSet<Integer>());
+ foreachB.setAlias("B");
+ foreachB.neverUseForRealSetSchema(bschema);
+
// C = join
LogicalSchema cschema = new LogicalSchema();
cschema.addField(new LogicalSchema.LogicalFieldSchema(
@@ -221,8 +233,8 @@ public class TestNewPlanLogicalOptimizer
mm.put(1, bprojplan);
C.neverUseForRealSetSchema(cschema);
expected.add(C);
- expected.connect(DA, C);
- expected.connect(DB, C);
+ expected.connect(foreachA, C);
+ expected.connect(foreachB, C);
// D = filter
LogicalExpressionPlan filterPlan = new LogicalExpressionPlan();
Modified:
pig/trunk/test/org/apache/pig/test/TestNewPlanPushDownForeachFlatten.java
URL:
http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestNewPlanPushDownForeachFlatten.java?rev=1736904&r1=1736903&r2=1736904&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/test/TestNewPlanPushDownForeachFlatten.java
(original)
+++ pig/trunk/test/org/apache/pig/test/TestNewPlanPushDownForeachFlatten.java
Mon Mar 28 16:58:33 2016
@@ -64,25 +64,25 @@ public class TestNewPlanPushDownForeachF
}
/**
- *
+ *
* A simple filter UDF for testing
*
*/
static public class MyFilterFunc extends FilterFunc {
-
+
@Override
public Boolean exec(Tuple input) {
return false;
}
}
-
+
/**
* Old plan is empty, so is the optimized new plan.
*/
@Test
public void testErrorEmptyInput() throws Exception {
LogicalPlan newLogicalPlan = migrateAndOptimizePlan( "" );
-
+
Assert.assertTrue( newLogicalPlan.getOperators().hasNext() == false );
}
@@ -100,7 +100,7 @@ public class TestNewPlanPushDownForeachF
List<Operator> nexts = newLogicalPlan.getSuccessors( load );
Assert.assertTrue( nexts != null && nexts.size() == 1 );
}
-
+
@Test
public void testForeachNoFlatten() throws Exception {
String query = "A = load 'myfile' as (name, age, gpa);" +
@@ -108,28 +108,30 @@ public class TestNewPlanPushDownForeachF
"C = order B by $0, $1;" +
"D = store C into 'dummy';";
LogicalPlan newLogicalPlan = migrateAndOptimizePlan( query );
-
+
Operator load = newLogicalPlan.getSources().get( 0 );
Assert.assertTrue( load instanceof LOLoad );
Operator foreach = newLogicalPlan.getSuccessors( load ).get( 0 );
Assert.assertTrue( foreach instanceof LOForEach );
+ foreach = newLogicalPlan.getSuccessors( foreach ).get( 0 );
+ Assert.assertTrue( foreach instanceof LOForEach );
Operator sort = newLogicalPlan.getSuccessors( foreach ).get( 0 );
Assert.assertTrue( sort instanceof LOSort );
}
-
+
@Test
public void testForeachNoSuccessors() throws Exception {
String query = "A = load 'myfile' as (name, age, gpa);" +
"B = foreach A generate flatten($1);" +
"Store B into 'output';";
LogicalPlan newLogicalPlan = migrateAndOptimizePlan( query );
-
+
Operator load = newLogicalPlan.getSources().get( 0 );
Assert.assertTrue( load instanceof LOLoad );
Operator foreach = newLogicalPlan.getSuccessors( load ).get( 0 );
Assert.assertTrue( foreach instanceof LOForEach );
}
-
+
@Test
public void testForeachStreaming() throws Exception {
String query = "A = load 'myfile' as (name, age, gpa);" +
@@ -137,61 +139,65 @@ public class TestNewPlanPushDownForeachF
"C = stream B through `" + "pc -l" + "`;" +
"Store C into 'output';";
LogicalPlan newLogicalPlan = migrateAndOptimizePlan( query );
-
+
Operator load = newLogicalPlan.getSources().get( 0 );
Assert.assertTrue( load instanceof LOLoad );
Operator foreach = newLogicalPlan.getSuccessors( load ).get( 0 );
Assert.assertTrue( foreach instanceof LOForEach );
}
-
+
@Test
public void testForeachDistinct() throws Exception {
String query = "A = load 'myfile' as (name, age, gpa);" +
"B = foreach A generate flatten($1);" +
"C = distinct B;" +
"store C into 'output';";
-
+
LogicalPlan newLogicalPlan = migrateAndOptimizePlan( query );
-
+
Operator load = newLogicalPlan.getSources().get( 0 );
Assert.assertTrue( load instanceof LOLoad );
Operator foreach = newLogicalPlan.getSuccessors( load ).get( 0 );
Assert.assertTrue( foreach instanceof LOForEach );
}
-
+
@Test
public void testForeachForeach() throws Exception {
String query = "A = load 'myfile' as (name, age, gpa);" +
- "B = foreach A generate $0, $1, flatten(1);" +
+ "B = foreach A generate $0, $1, flatten(1);" +
"C = foreach B generate $0;" +
"store C into 'output';";
-
+
LogicalPlan newLogicalPlan = migrateAndOptimizePlan( query );
-
+
Operator load = newLogicalPlan.getSources().get( 0 );
Assert.assertTrue( load instanceof LOLoad );
Operator foreach = newLogicalPlan.getSuccessors( load ).get( 0 );
Assert.assertTrue( foreach instanceof LOForEach );
+ foreach = newLogicalPlan.getSuccessors( foreach ).get( 0 );
+ Assert.assertTrue( foreach instanceof LOForEach );
Assert.assertTrue( OptimizerUtils.hasFlatten( (LOForEach)foreach ) );
foreach = newLogicalPlan.getSuccessors( foreach ).get( 0 );
Assert.assertTrue( foreach instanceof LOForEach );
Assert.assertTrue( !OptimizerUtils.hasFlatten( (LOForEach)foreach ) );
}
-
+
@Test
public void testForeachFilter() throws Exception {
String query = "A = load 'myfile' as (name, age, gpa);" +
- "B = foreach A generate $0, $1, flatten($2);" +
+ "B = foreach A generate $0, $1, flatten($2);" +
"C = filter B by $1 < 18;" +
"store C into 'output';";
-
+
LogicalPlan newLogicalPlan = migrateAndOptimizePlan( query );
-
+
Operator load = newLogicalPlan.getSources().get( 0 );
Assert.assertTrue( load instanceof LOLoad );
Operator foreach = newLogicalPlan.getSuccessors( load ).get( 0 );
Assert.assertTrue( foreach instanceof LOForEach );
+ foreach = newLogicalPlan.getSuccessors( foreach ).get( 0 );
+ Assert.assertTrue( foreach instanceof LOForEach );
Assert.assertTrue( OptimizerUtils.hasFlatten( (LOForEach)foreach ) );
}
@@ -200,15 +206,17 @@ public class TestNewPlanPushDownForeachF
String query = "A = load 'myfile' as (name, age, gpa);" +
"B = foreach A generate $0, $1, flatten($2);" +
"split B into C if $1 < 18, D if $1 >= 18;" +
- "store C into 'output1';" +
+ "store C into 'output1';" +
"store D into 'output2';";
-
+
LogicalPlan newLogicalPlan = migrateAndOptimizePlan( query );
Operator load = newLogicalPlan.getSources().get( 0 );
Assert.assertTrue( load instanceof LOLoad );
Operator foreach = newLogicalPlan.getSuccessors( load ).get( 0 );
Assert.assertTrue( foreach instanceof LOForEach );
+ foreach = newLogicalPlan.getSuccessors( foreach ).get( 0 );
+ Assert.assertTrue( foreach instanceof LOForEach );
Assert.assertTrue( OptimizerUtils.hasFlatten( (LOForEach)foreach ) );
}
@@ -218,13 +226,15 @@ public class TestNewPlanPushDownForeachF
"B = foreach A generate $0, $1, flatten($2);" +
"C = limit B 10;" +
"store C into 'output';";
-
+
LogicalPlan newLogicalPlan = migrateAndOptimizePlan( query );
-
+
Operator load = newLogicalPlan.getSources().get( 0 );
Assert.assertTrue( load instanceof LOLoad );
Operator foreach = newLogicalPlan.getSuccessors( load ).get( 0 );
Assert.assertTrue( foreach instanceof LOForEach );
+ foreach = newLogicalPlan.getSuccessors( foreach ).get( 0 );
+ Assert.assertTrue( foreach instanceof LOForEach );
Assert.assertTrue( OptimizerUtils.hasFlatten( (LOForEach)foreach ) );
}
@@ -235,24 +245,26 @@ public class TestNewPlanPushDownForeachF
"C = load 'anotherfile' as (name, age, preference);" +
"D = union B, C;" +
"store D into 'output';";
-
+
LogicalPlan newLogicalPlan = migrateAndOptimizePlan( query );
-
+
List<Operator> loads = newLogicalPlan.getSources();
Assert.assertTrue( loads.size() == 2 );
Assert.assertTrue( loads.get( 0 ) instanceof LOLoad );
Assert.assertTrue( loads.get( 1 ) instanceof LOLoad );
Operator load = null;
- if( ((LOLoad)loads.get( 0 )).getAlias().equals( "A" ) )
+ if( ((LOLoad)loads.get( 0 )).getAlias().equals( "A" ) )
load = loads.get( 0 );
else
load = loads.get( 1 );
Assert.assertTrue( load instanceof LOLoad );
Operator foreach = newLogicalPlan.getSuccessors( load ).get( 0 );
Assert.assertTrue( foreach instanceof LOForEach );
+ foreach = newLogicalPlan.getSuccessors( foreach ).get( 0 );
+ Assert.assertTrue( foreach instanceof LOForEach );
Assert.assertTrue( OptimizerUtils.hasFlatten( (LOForEach)foreach ) );
}
-
+
@Test
public void testForeachCogroup() throws Exception {
String query = "A = load 'myfile' as (name, age, gpa);" +
@@ -260,7 +272,7 @@ public class TestNewPlanPushDownForeachF
"C = load 'anotherfile' as (name, age, preference);" +
"D = cogroup B by $0, C by $0;" +
"store D into 'output';";
-
+
LogicalPlan newLogicalPlan = migrateAndOptimizePlan( query );
List<Operator> loads = newLogicalPlan.getSources();
@@ -268,32 +280,36 @@ public class TestNewPlanPushDownForeachF
Assert.assertTrue( loads.get( 0 ) instanceof LOLoad );
Assert.assertTrue( loads.get( 1 ) instanceof LOLoad );
Operator load = null;
- if( ((LOLoad)loads.get( 0 )).getAlias().equals( "A" ) )
+ if( ((LOLoad)loads.get( 0 )).getAlias().equals( "A" ) )
load = loads.get( 0 );
else
load = loads.get( 1 );
Assert.assertTrue( load instanceof LOLoad );
Operator foreach = newLogicalPlan.getSuccessors( load ).get( 0 );
Assert.assertTrue( foreach instanceof LOForEach );
+ foreach = newLogicalPlan.getSuccessors( foreach ).get( 0 );
+ Assert.assertTrue( foreach instanceof LOForEach );
Assert.assertTrue( OptimizerUtils.hasFlatten( (LOForEach)foreach ) );
}
-
+
@Test
public void testForeachGroupBy() throws Exception {
String query = "A = load 'myfile' as (name, age, gpa);" +
"B = foreach A generate $0, $1, flatten($2);" +
"C = group B by $0;" +
"store C into 'output';";
-
+
LogicalPlan newLogicalPlan = migrateAndOptimizePlan( query );
-
+
Operator load = newLogicalPlan.getSources().get( 0 );
Assert.assertTrue( load instanceof LOLoad );
Operator foreach = newLogicalPlan.getSuccessors( load ).get( 0 );
Assert.assertTrue( foreach instanceof LOForEach );
+ foreach = newLogicalPlan.getSuccessors( foreach ).get( 0 );
+ Assert.assertTrue( foreach instanceof LOForEach );
Assert.assertTrue( OptimizerUtils.hasFlatten( (LOForEach)foreach ) );
}
-
+
@Test
public void testForeachSort() throws Exception {
String query = "A = load 'myfile' as (name, age, gpa);" +
@@ -301,16 +317,18 @@ public class TestNewPlanPushDownForeachF
"C = order B by $0, $1;" +
"D = store C into 'dummy';";
LogicalPlan newLogicalPlan = migrateAndOptimizePlan( query );
-
+
Operator load = newLogicalPlan.getSources().get( 0 );
Assert.assertTrue( load instanceof LOLoad );
- Operator sort = newLogicalPlan.getSuccessors( load ).get( 0 );
+ Operator foreach = newLogicalPlan.getSuccessors( load ).get( 0 );
+ Assert.assertTrue( foreach instanceof LOForEach );
+ Operator sort = newLogicalPlan.getSuccessors( foreach ).get( 0 );
Assert.assertTrue( sort instanceof LOSort );
- Operator foreach = newLogicalPlan.getSuccessors( sort ).get( 0 );
+ foreach = newLogicalPlan.getSuccessors( sort ).get( 0 );
Assert.assertTrue( foreach instanceof LOForEach );
Assert.assertTrue( OptimizerUtils.hasFlatten( (LOForEach)foreach ) );
}
-
+
/**
* Non-pure-projection, not optimizable.
*/
@@ -321,16 +339,18 @@ public class TestNewPlanPushDownForeachF
"C = order B by $0, $1;" +
"D = store C into 'dummy';";
LogicalPlan newLogicalPlan = migrateAndOptimizePlan( query );
-
+
Operator load = newLogicalPlan.getSources().get( 0 );
Assert.assertTrue( load instanceof LOLoad );
Operator foreach = newLogicalPlan.getSuccessors( load ).get( 0 );
Assert.assertTrue( foreach instanceof LOForEach );
+ foreach = newLogicalPlan.getSuccessors( foreach ).get( 0 );
+ Assert.assertTrue( foreach instanceof LOForEach );
Operator sort = newLogicalPlan.getSuccessors( foreach ).get( 0 );
Assert.assertTrue( sort instanceof LOSort );
}
-
-
+
+
/**
* If the flattened field is referenced in the sort condition, then no
optimization can be done.
*/
@@ -341,7 +361,7 @@ public class TestNewPlanPushDownForeachF
"C = order B by $0, $3;" +
"D = store C into 'dummy';";
LogicalPlan newLogicalPlan = migrateAndOptimizePlan( query );
-
+
Operator load = newLogicalPlan.getSources().get( 0 );
Assert.assertTrue( load instanceof LOLoad );
Operator foreach = newLogicalPlan.getSuccessors( load ).get( 0 );
@@ -360,49 +380,55 @@ public class TestNewPlanPushDownForeachF
"store C into 'output';";
LogicalPlan newLogicalPlan = migrateAndOptimizePlan( query );
-
+
Operator load = newLogicalPlan.getSources().get( 0 );
Assert.assertTrue( load instanceof LOLoad );
Operator foreach = newLogicalPlan.getSuccessors( load ).get( 0 );
Assert.assertTrue( foreach instanceof LOForEach );
+ foreach = newLogicalPlan.getSuccessors( foreach ).get( 0 );
+ Assert.assertTrue( foreach instanceof LOForEach );
Operator sort = newLogicalPlan.getSuccessors( foreach ).get( 0 );
Assert.assertTrue( sort instanceof LOSort );
}
-
+
@Test
public void testForeachUDFSort() throws Exception {
String query = "A = load 'myfile' as (name, age, gpa);" +
"B = foreach A generate $0, $1, " + Identity.class.getName() + "($2)
;" +
"C = order B by $0, $1;" +
"store C into 'output';";
-
+
LogicalPlan newLogicalPlan = migrateAndOptimizePlan( query );
-
+
Operator load = newLogicalPlan.getSources().get( 0 );
Assert.assertTrue( load instanceof LOLoad );
Operator foreach = newLogicalPlan.getSuccessors( load ).get( 0 );
Assert.assertTrue( foreach instanceof LOForEach );
+ foreach = newLogicalPlan.getSuccessors( foreach ).get( 0 );
+ Assert.assertTrue( foreach instanceof LOForEach );
Operator sort = newLogicalPlan.getSuccessors( foreach ).get( 0 );
Assert.assertTrue( sort instanceof LOSort );
}
-
+
@Test
public void testForeachCastSort() throws Exception {
String query = "A = load 'myfile' as (name, age, gpa);" +
- "B = foreach A generate (chararray)$0, $1, flatten($2);" +
+ "B = foreach A generate (chararray)$0, $1, flatten($2);" +
"C = order B by $0, $1;" +
"store C into 'output';";
LogicalPlan newLogicalPlan = migrateAndOptimizePlan( query );
-
+
Operator load = newLogicalPlan.getSources().get( 0 );
Assert.assertTrue( load instanceof LOLoad );
Operator foreach = newLogicalPlan.getSuccessors( load ).get( 0 );
Assert.assertTrue( foreach instanceof LOForEach );
+ foreach = newLogicalPlan.getSuccessors( foreach ).get( 0 );
+ Assert.assertTrue( foreach instanceof LOForEach );
Operator sort = newLogicalPlan.getSuccessors( foreach ).get( 0 );
Assert.assertTrue( sort instanceof LOSort );
}
-
+
@Test
public void testForeachCross() throws Exception {
String query = "A = load 'myfile' as (name, age, gpa:(letter_grade,
point_score));" +
@@ -413,13 +439,13 @@ public class TestNewPlanPushDownForeachF
"store E into 'output';";
LogicalPlan newLogicalPlan = migrateAndOptimizePlan( query );
-
+
List<Operator> loads = newLogicalPlan.getSources();
Assert.assertTrue( loads.size() == 2 );
Assert.assertTrue( loads.get( 0 ) instanceof LOLoad );
Assert.assertTrue( loads.get( 1 ) instanceof LOLoad );
Operator op = null;
- if( ((LOLoad)loads.get( 0 )).getAlias().equals( "A" ) )
+ if( ((LOLoad)loads.get( 0 )).getAlias().equals( "A" ) )
op = loads.get( 0 );
else
op = loads.get( 1 );
@@ -454,7 +480,7 @@ public class TestNewPlanPushDownForeachF
Assert.assertTrue( loads.get( 0 ) instanceof LOLoad );
Assert.assertTrue( loads.get( 1 ) instanceof LOLoad );
Operator op = null;
- if( ((LOLoad)loads.get( 0 )).getAlias().equals( "B" ) )
+ if( ((LOLoad)loads.get( 0 )).getAlias().equals( "B" ) )
op = loads.get( 0 );
else
op = loads.get( 1 );
@@ -487,16 +513,16 @@ public class TestNewPlanPushDownForeachF
"store F into 'output';";
LogicalPlan newLogicalPlan = migrateAndOptimizePlan( query );
-
+
// No optimization about foreach flatten.
Operator store = newLogicalPlan.getSinks().get( 0 );
Operator limit = newLogicalPlan.getPredecessors(store).get(0);
Operator cross = newLogicalPlan.getPredecessors(limit).get(0);
Assert.assertTrue( cross instanceof LOCross );
}
-
+
/**
- * This actually is a valid case, even though the optimization may not
provide any performance benefit. However, detecting
+ * This actually is a valid case, even though the optimization may not
provide any performance benefit. However, detecting
* such a case requires more coding. Thus, we allow optimization to go
thru in this case.
*/
@Test
@@ -509,13 +535,13 @@ public class TestNewPlanPushDownForeachF
"store E into 'output';";
LogicalPlan newLogicalPlan = migrateAndOptimizePlan( query );
-
+
List<Operator> loads = newLogicalPlan.getSources();
Assert.assertTrue( loads.size() == 2 );
Assert.assertTrue( loads.get( 0 ) instanceof LOLoad );
Assert.assertTrue( loads.get( 1 ) instanceof LOLoad );
Operator op = null;
- if( ((LOLoad)loads.get( 0 )).getAlias().equals( "A" ) )
+ if( ((LOLoad)loads.get( 0 )).getAlias().equals( "A" ) )
op = loads.get( 0 );
else
op = loads.get( 1 );
@@ -547,13 +573,13 @@ public class TestNewPlanPushDownForeachF
"store E into 'output';";
LogicalPlan newLogicalPlan = migrateAndOptimizePlan( query );
-
+
List<Operator> loads = newLogicalPlan.getSources();
Assert.assertTrue( loads.size() == 2 );
Assert.assertTrue( loads.get( 0 ) instanceof LOLoad );
Assert.assertTrue( loads.get( 1 ) instanceof LOLoad );
Operator op = null;
- if( ((LOLoad)loads.get( 0 )).getAlias().equals( "A" ) )
+ if( ((LOLoad)loads.get( 0 )).getAlias().equals( "A" ) )
op = loads.get( 0 );
else
op = loads.get( 1 );
@@ -571,7 +597,7 @@ public class TestNewPlanPushDownForeachF
op = newLogicalPlan.getSuccessors( op ).get( 0 );
Assert.assertTrue( op instanceof LOLimit );
}
-
+
/**
* Cast should NOT matter to cross. This is a valid positive test case.
*/
@@ -585,13 +611,13 @@ public class TestNewPlanPushDownForeachF
"store E into 'output';";
LogicalPlan newLogicalPlan = migrateAndOptimizePlan( query );
-
+
List<Operator> loads = newLogicalPlan.getSources();
Assert.assertTrue( loads.size() == 2 );
Assert.assertTrue( loads.get( 0 ) instanceof LOLoad );
Assert.assertTrue( loads.get( 1 ) instanceof LOLoad );
Operator op = null;
- if( ((LOLoad)loads.get( 0 )).getAlias().equals( "A" ) )
+ if( ((LOLoad)loads.get( 0 )).getAlias().equals( "A" ) )
op = loads.get( 0 );
else
op = loads.get( 1 );
@@ -609,7 +635,7 @@ public class TestNewPlanPushDownForeachF
op = newLogicalPlan.getSuccessors( op ).get( 0 );
Assert.assertTrue( op instanceof LOLimit );
}
-
+
@Test
public void testForeachFRJoin() throws Exception {
String query = "A = load 'myfile' as (name, age, gpa:(letter_grade,
point_score));" +
@@ -619,7 +645,7 @@ public class TestNewPlanPushDownForeachF
"E = limit D 10;" +
"store E into 'output';";
-
+
LogicalPlan newLogicalPlan = migrateAndOptimizePlan( query );
List<Operator> loads = newLogicalPlan.getSources();
@@ -627,7 +653,7 @@ public class TestNewPlanPushDownForeachF
Assert.assertTrue( loads.get( 0 ) instanceof LOLoad );
Assert.assertTrue( loads.get( 1 ) instanceof LOLoad );
Operator op = null;
- if( ((LOLoad)loads.get( 0 )).getAlias().equals( "A" ) )
+ if( ((LOLoad)loads.get( 0 )).getAlias().equals( "A" ) )
op = loads.get( 0 );
else
op = loads.get( 1 );
@@ -662,7 +688,7 @@ public class TestNewPlanPushDownForeachF
Assert.assertTrue( loads.get( 0 ) instanceof LOLoad );
Assert.assertTrue( loads.get( 1 ) instanceof LOLoad );
Operator op = null;
- if( ((LOLoad)loads.get( 0 )).getAlias().equals( "B" ) )
+ if( ((LOLoad)loads.get( 0 )).getAlias().equals( "B" ) )
op = loads.get( 0 );
else
op = loads.get( 1 );
@@ -693,7 +719,7 @@ public class TestNewPlanPushDownForeachF
"E = join B by $0, D by $0 using 'replicated';" +
"F = limit E 10;" +
"store F into 'output';";
-
+
LogicalPlan newLogicalPlan = migrateAndOptimizePlan( query );
// No optimization about foreach flatten.
@@ -702,7 +728,7 @@ public class TestNewPlanPushDownForeachF
Operator join = newLogicalPlan.getPredecessors( limit ).get( 0 );
Assert.assertTrue( join instanceof LOJoin );
}
-
+
/**
* Valid positive test case, even though the benefit from the optimization
is questionable. However, putting in additinal check for
* this condition requires extra coding.
@@ -717,13 +743,13 @@ public class TestNewPlanPushDownForeachF
"store E into 'output';";
LogicalPlan newLogicalPlan = migrateAndOptimizePlan( query );
-
+
List<Operator> loads = newLogicalPlan.getSources();
Assert.assertTrue( loads.size() == 2 );
Assert.assertTrue( loads.get( 0 ) instanceof LOLoad );
Assert.assertTrue( loads.get( 1 ) instanceof LOLoad );
Operator op = null;
- if( ((LOLoad)loads.get( 0 )).getAlias().equals( "A" ) )
+ if( ((LOLoad)loads.get( 0 )).getAlias().equals( "A" ) )
op = loads.get( 0 );
else
op = loads.get( 1 );
@@ -756,13 +782,13 @@ public class TestNewPlanPushDownForeachF
"store E into 'output';";
LogicalPlan newLogicalPlan = migrateAndOptimizePlan( query );
-
+
List<Operator> loads = newLogicalPlan.getSources();
Assert.assertTrue( loads.size() == 2 );
Assert.assertTrue( loads.get( 0 ) instanceof LOLoad );
Assert.assertTrue( loads.get( 1 ) instanceof LOLoad );
Operator op = null;
- if( ((LOLoad)loads.get( 0 )).getAlias().equals( "A" ) )
+ if( ((LOLoad)loads.get( 0 )).getAlias().equals( "A" ) )
op = loads.get( 0 );
else
op = loads.get( 1 );
@@ -795,13 +821,13 @@ public class TestNewPlanPushDownForeachF
"store E into 'output';";
LogicalPlan newLogicalPlan = migrateAndOptimizePlan( query );
-
+
List<Operator> loads = newLogicalPlan.getSources();
Assert.assertTrue( loads.size() == 2 );
Assert.assertTrue( loads.get( 0 ) instanceof LOLoad );
Assert.assertTrue( loads.get( 1 ) instanceof LOLoad );
Operator op = null;
- if( ((LOLoad)loads.get( 0 )).getAlias().equals( "A" ) )
+ if( ((LOLoad)loads.get( 0 )).getAlias().equals( "A" ) )
op = loads.get( 0 );
else
op = loads.get( 1 );
@@ -830,13 +856,13 @@ public class TestNewPlanPushDownForeachF
"store E into 'output';";
LogicalPlan newLogicalPlan = migrateAndOptimizePlan( query );
-
+
List<Operator> loads = newLogicalPlan.getSources();
Assert.assertTrue( loads.size() == 2 );
Assert.assertTrue( loads.get( 0 ) instanceof LOLoad );
Assert.assertTrue( loads.get( 1 ) instanceof LOLoad );
Operator op = null;
- if( ((LOLoad)loads.get( 0 )).getAlias().equals( "A" ) )
+ if( ((LOLoad)loads.get( 0 )).getAlias().equals( "A" ) )
op = loads.get( 0 );
else
op = loads.get( 1 );
@@ -854,7 +880,7 @@ public class TestNewPlanPushDownForeachF
op = newLogicalPlan.getSuccessors( op ).get( 0 );
Assert.assertTrue( op instanceof LOLimit );
}
-
+
@Test
public void testForeachInnerJoin1() throws Exception {
String query = "A = load 'myfile' as (name, age, gpa:(letter_grade,
point_score));" +
@@ -865,13 +891,13 @@ public class TestNewPlanPushDownForeachF
"store E into 'output';";
LogicalPlan newLogicalPlan = migrateAndOptimizePlan( query );
-
+
List<Operator> loads = newLogicalPlan.getSources();
Assert.assertTrue( loads.size() == 2 );
Assert.assertTrue( loads.get( 0 ) instanceof LOLoad );
Assert.assertTrue( loads.get( 1 ) instanceof LOLoad );
Operator op = null;
- if( ((LOLoad)loads.get( 0 )).getAlias().equals( "B" ) )
+ if( ((LOLoad)loads.get( 0 )).getAlias().equals( "B" ) )
op = loads.get( 0 );
else
op = loads.get( 1 );
@@ -911,9 +937,9 @@ public class TestNewPlanPushDownForeachF
Operator join = newLogicalPlan.getPredecessors( limit ).get( 0 );
Assert.assertTrue( join instanceof LOJoin );
}
-
+
/**
- * This is actually a valid positive test case, even though the benefit of
such optimization is questionable. However,
+ * This is actually a valid positive test case, even though the benefit of
such optimization is questionable. However,
* checking for such condition requires additional coding effort.
*/
@Test
@@ -926,13 +952,13 @@ public class TestNewPlanPushDownForeachF
"store E into 'output';";
LogicalPlan newLogicalPlan = migrateAndOptimizePlan( query );
-
+
List<Operator> loads = newLogicalPlan.getSources();
Assert.assertTrue( loads.size() == 2 );
Assert.assertTrue( loads.get( 0 ) instanceof LOLoad );
Assert.assertTrue( loads.get( 1 ) instanceof LOLoad );
Operator op = null;
- if( ((LOLoad)loads.get( 0 )).getAlias().equals( "A" ) )
+ if( ((LOLoad)loads.get( 0 )).getAlias().equals( "A" ) )
op = loads.get( 0 );
else
op = loads.get( 1 );
@@ -964,13 +990,13 @@ public class TestNewPlanPushDownForeachF
"store E into 'output';";
LogicalPlan newLogicalPlan = migrateAndOptimizePlan( query );
-
+
List<Operator> loads = newLogicalPlan.getSources();
Assert.assertTrue( loads.size() == 2 );
Assert.assertTrue( loads.get( 0 ) instanceof LOLoad );
Assert.assertTrue( loads.get( 1 ) instanceof LOLoad );
Operator op = null;
- if( ((LOLoad)loads.get( 0 )).getAlias().equals( "A" ) )
+ if( ((LOLoad)loads.get( 0 )).getAlias().equals( "A" ) )
op = loads.get( 0 );
else
op = loads.get( 1 );
@@ -1002,13 +1028,13 @@ public class TestNewPlanPushDownForeachF
"store E into 'output';";
LogicalPlan newLogicalPlan = migrateAndOptimizePlan( query );
-
+
List<Operator> loads = newLogicalPlan.getSources();
Assert.assertTrue( loads.size() == 2 );
Assert.assertTrue( loads.get( 0 ) instanceof LOLoad );
Assert.assertTrue( loads.get( 1 ) instanceof LOLoad );
Operator op = null;
- if( ((LOLoad)loads.get( 0 )).getAlias().equals( "A" ) )
+ if( ((LOLoad)loads.get( 0 )).getAlias().equals( "A" ) )
op = loads.get( 0 );
else
op = loads.get( 1 );
@@ -1045,7 +1071,7 @@ public class TestNewPlanPushDownForeachF
Operator join = newLogicalPlan.getPredecessors( limit ).get( 0 );
Assert.assertTrue( join instanceof LOJoin );
}
-
+
// See PIG-1374
@Test
public void testForeachRequiredField() throws Exception {
@@ -1055,7 +1081,7 @@ public class TestNewPlanPushDownForeachF
"store C into 'output';";
LogicalPlan newLogicalPlan = migrateAndOptimizePlan( query );
-
+
Operator load = newLogicalPlan.getSources().get( 0 );
Assert.assertTrue( load instanceof LOLoad );
Operator foreach = newLogicalPlan.getSuccessors( load ).get( 0 );
@@ -1065,7 +1091,7 @@ public class TestNewPlanPushDownForeachF
Operator sort = newLogicalPlan.getSuccessors( foreach1 ).get( 0 );
Assert.assertTrue( sort instanceof LOSort );
}
-
+
// See PIG-1706
@Test
public void testForeachWithUserDefinedSchema() throws Exception {
@@ -1076,13 +1102,13 @@ public class TestNewPlanPushDownForeachF
"store d into 'output';";
LogicalPlan newLogicalPlan = migrateAndOptimizePlan( query );
-
+
Operator store = newLogicalPlan.getSinks().get( 0 );
LOForEach foreach =
(LOForEach)newLogicalPlan.getPredecessors(store).get(0);
Assert.assertTrue(foreach.getSchema().getField(1).alias.equals("q1"));
Assert.assertTrue(foreach.getSchema().getField(2).alias.equals("q2"));
}
-
+
// See PIG-1751
@Test
public void testForeachWithUserDefinedSchema2() throws Exception {
@@ -1093,7 +1119,7 @@ public class TestNewPlanPushDownForeachF
"store d into 'output';";
LogicalPlan newLogicalPlan = migrateAndOptimizePlan( query );
-
+
Operator store = newLogicalPlan.getSinks().get( 0 );
Operator op = newLogicalPlan.getPredecessors(store).get(0);
Assert.assertTrue(op instanceof LOJoin);
@@ -1112,7 +1138,7 @@ public class TestNewPlanPushDownForeachF
Operator load = newLogicalPlan.getSources().get( 0 );
Assert.assertTrue( load instanceof LOLoad );
- Assert.assertTrue( "Field \"a1\" is dropped by ColumnMapKeyPrune" +
+ Assert.assertTrue( "Field \"a1\" is dropped by ColumnMapKeyPrune" +
"even though it should be stored",
((LOLoad)load).getSchema().getField("a1") != null );
}
@@ -1143,6 +1169,7 @@ public class TestNewPlanPushDownForeachF
addPlanTransformListener(new ProjectionPatcher());
}
+ @Override
protected List<Set<Rule>> buildRuleSets() {
List<Set<Rule>> ls = new ArrayList<Set<Rule>>();
@@ -1178,24 +1205,25 @@ public class TestNewPlanPushDownForeachF
protected MyPlanOptimizer(OperatorPlan p, int iterations) {
super(p, iterations, new HashSet<String>());
}
-
- protected List<Set<Rule>> buildRuleSets() {
+
+ @Override
+ protected List<Set<Rule>> buildRuleSets() {
List<Set<Rule>> ls = new ArrayList<Set<Rule>>();
-
+
Set<Rule> s = new HashSet<Rule>();
// add split filter rule
Rule r = new LoadTypeCastInserter( "TypeCastInserter" );
s.add(r);
ls.add(s);
-
+
s = new HashSet<Rule>();
r = new PushDownForEachFlatten( "PushDownForEachFlatten" );
- s.add(r);
+ s.add(r);
ls.add(s);
-
+
return ls;
}
- }
+ }
private LogicalPlan migrateAndOptimizePlan(String query) throws Exception {
PigServer pigServer = new PigServer( pc );
@@ -1212,16 +1240,18 @@ public class TestNewPlanPushDownForeachF
"C = order B by $0, $1;" +
"D = store C into 'dummy';";
LogicalPlan newLogicalPlan = migrateAndOptimizePlan( query );
-
+
Operator load = newLogicalPlan.getSources().get( 0 );
Assert.assertTrue( load instanceof LOLoad );
Operator foreach = newLogicalPlan.getSuccessors( load ).get( 0 );
Assert.assertTrue( foreach instanceof LOForEach );
+ foreach = newLogicalPlan.getSuccessors( foreach ).get( 0 );
+ Assert.assertTrue( foreach instanceof LOForEach );
Operator sort = newLogicalPlan.getSuccessors( foreach ).get( 0 );
Assert.assertTrue( sort instanceof LOSort );
-
+
}
-
+
@Test
// See PIG-3826
public void testOuterJoin() throws Exception {
@@ -1232,7 +1262,7 @@ public class TestNewPlanPushDownForeachF
"t3 = join B by id LEFT OUTER, t2 by id;" +
"store t3 into 'output';";
LogicalPlan newLogicalPlan = migrateAndOptimizePlan( query );
-
+
Operator store = newLogicalPlan.getSinks().get( 0 );
Operator join = newLogicalPlan.getPredecessors(store).get(0);
Assert.assertTrue( join instanceof LOJoin );