Author: daijy
Date: Tue Mar 24 16:42:37 2015
New Revision: 1668933
URL: http://svn.apache.org/r1668933
Log:
PIG-4458: Support UDFs in a FOREACH Before a Merge Join
Modified:
pig/trunk/CHANGES.txt
pig/trunk/src/docs/src/documentation/content/xdocs/perf.xml
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POMergeJoin.java
pig/trunk/src/org/apache/pig/newplan/logical/relational/MapSideMergeValidator.java
pig/trunk/test/org/apache/pig/test/TestMergeJoin.java
Modified: pig/trunk/CHANGES.txt
URL:
http://svn.apache.org/viewvc/pig/trunk/CHANGES.txt?rev=1668933&r1=1668932&r2=1668933&view=diff
==============================================================================
--- pig/trunk/CHANGES.txt (original)
+++ pig/trunk/CHANGES.txt Tue Mar 24 16:42:37 2015
@@ -24,6 +24,8 @@ INCOMPATIBLE CHANGES
IMPROVEMENTS
+PIG-4458: Support UDFs in a FOREACH Before a Merge Join (wattsinabox via daijy)
+
PIG-4454: Upgrade tez to 0.6.0 (daijy)
PIG-4451: Log partition and predicate filter pushdown information and fix
optimizer looping (rohini)
Modified: pig/trunk/src/docs/src/documentation/content/xdocs/perf.xml
URL:
http://svn.apache.org/viewvc/pig/trunk/src/docs/src/documentation/content/xdocs/perf.xml?rev=1668933&r1=1668932&r2=1668933&view=diff
==============================================================================
--- pig/trunk/src/docs/src/documentation/content/xdocs/perf.xml (original)
+++ pig/trunk/src/docs/src/documentation/content/xdocs/perf.xml Tue Mar 24
16:42:37 2015
@@ -1286,9 +1286,9 @@ C = JOIN A BY a1, B BY b1, C BY c1 USING
<li>Data must come directly from either a Load or an Order statement.</li>
<li>There may be filter statements and foreach statements between the sorted
data source and the join statement. The foreach statement should meet the
following conditions:
<ul>
- <li>There should be no UDFs in the foreach statement. </li>
<li>The foreach statement should not change the position of the join keys.
</li>
- <li>There should be no transformation on the join keys which will change
the sort order. </li>
+ <li>There should be no transformation on the join keys which will change
the sort order.</li>
+ <li>UDFs also have to adhere to the previous condition and should not
transform the JOIN keys in a way that would change the sort order.</li>
</ul>
</li>
<li>Data must be sorted on join keys in ascending (ASC) order on both
sides.</li>
Modified:
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POMergeJoin.java
URL:
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POMergeJoin.java?rev=1668933&r1=1668932&r2=1668933&view=diff
==============================================================================
---
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POMergeJoin.java
(original)
+++
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POMergeJoin.java
Tue Mar 24 16:42:37 2015
@@ -70,6 +70,12 @@ public class POMergeJoin extends Physica
private static final long serialVersionUID = 1L;
+ private static final String keyOrderReminder = "Remember that you should "
+
+ "not change the order of keys before a merge join in a FOREACH or " +
+ "manipulate join keys in a UDF in a way that would change the sort " +
+ "order. UDFs in a FOREACH are allowed as long as they do not change" +
+ "the join key values in a way that would change the sort order.\n";
+
// flag to indicate when getNext() is called first.
private boolean firstTime = true;
@@ -370,7 +376,9 @@ public class POMergeJoin extends Physica
}
else{ // At this point right side can't be behind.
int errCode = 1102;
- String errMsg = "Data is not sorted on right side.
Last two tuples encountered were: \n"+
+ String errMsg = "Data is not sorted on right side. \n"
+
+ keyOrderReminder +
+ "Last two tuples encountered were: \n"+
curJoiningRightTup+ "\n" + (Tuple)rightInp.result ;
throw new ExecException(errMsg,errCode);
}
@@ -398,7 +406,9 @@ public class POMergeJoin extends Physica
}
else{ // Current key < Prev Key
int errCode = 1102;
- String errMsg = "Data is not sorted on left side. Last two
keys encountered were: \n"+
+ String errMsg = "Data is not sorted on left side. \n" +
+ keyOrderReminder +
+ "Last two tuples encountered were: \n" +
prevLeftKey+ "\n" + curLeftKey ;
throw new ExecException(errMsg,errCode);
}
@@ -468,7 +478,9 @@ public class POMergeJoin extends Physica
if( prevRightKey != null && rightKey.compareTo(prevRightKey) < 0){
// Sanity check.
int errCode = 1102;
- String errMsg = "Data is not sorted on right side. Last two
keys encountered were: \n"+
+ String errMsg = "Data is not sorted on right side. \n" +
+ keyOrderReminder +
+ "Last two tuples encountered were: \n"+
prevRightKey+ "\n" + rightKey ;
throw new ExecException(errMsg,errCode);
}
Modified:
pig/trunk/src/org/apache/pig/newplan/logical/relational/MapSideMergeValidator.java
URL:
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/newplan/logical/relational/MapSideMergeValidator.java?rev=1668933&r1=1668932&r2=1668933&view=diff
==============================================================================
---
pig/trunk/src/org/apache/pig/newplan/logical/relational/MapSideMergeValidator.java
(original)
+++
pig/trunk/src/org/apache/pig/newplan/logical/relational/MapSideMergeValidator.java
Tue Mar 24 16:42:37 2015
@@ -59,8 +59,7 @@ public class MapSideMergeValidator {
private boolean isAcceptableForEachOp(Operator lo) throws
LogicalToPhysicalTranslatorException {
if (lo instanceof LOForEach) {
OperatorPlan innerPlan = ((LOForEach) lo).getInnerPlan();
- validateMapSideMerge(innerPlan.getSinks(), innerPlan);
- return !containsUDFs((LOForEach) lo);
+ return validateMapSideMerge(innerPlan.getSinks(), innerPlan);
} else {
return false;
}
@@ -83,22 +82,4 @@ public class MapSideMergeValidator {
}
return true;
}
-
- private boolean containsUDFs(LOForEach fo) throws
LogicalToPhysicalTranslatorException {
- LogicalPlan logExpPlan = fo.getInnerPlan();
- UDFFinder udfFinder;
- try {
- udfFinder = new UDFFinder(logExpPlan);
- udfFinder.visit();
- // TODO (dvryaboy): in the future we could relax this rule by
tracing what fields
- // are being passed into the UDF, and only refusing if the UDF is
working on the
- // join key. Transforms of other fields should be ok.
- if (udfFinder.getUDFList().size() != 0) {
- return true;
- }
- } catch (FrontendException e) {
- throw new LogicalToPhysicalTranslatorException(e);
- }
- return false;
- }
}
Modified: pig/trunk/test/org/apache/pig/test/TestMergeJoin.java
URL:
http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestMergeJoin.java?rev=1668933&r1=1668932&r2=1668933&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/test/TestMergeJoin.java (original)
+++ pig/trunk/test/org/apache/pig/test/TestMergeJoin.java Tue Mar 24 16:42:37
2015
@@ -450,6 +450,33 @@ public class TestMergeJoin {
}
@Test
+ public void testMergeJoinWithUDF() throws Exception{
+ pigServer.registerQuery("A = LOAD '" + INPUT_FILE + "' as
(x:int,y:double);");
+ pigServer.registerQuery("B = LOAD '" + INPUT_FILE + "' as
(x:int,y:double);");
+ pigServer.registerQuery("A = FOREACH A GENERATE x,
org.apache.pig.piggybank.evaluation.math.ABS(y) AS y;");
+
+ DataBag dbMergeJoin = BagFactory.getInstance().newDefaultBag(), dbshj
= BagFactory.getInstance().newDefaultBag();
+ {
+ pigServer.registerQuery("C = JOIN A BY x, B BY x USING 'merge';");
+ Iterator<Tuple> iter = pigServer.openIterator("C");
+
+ while(iter.hasNext()) {
+ dbMergeJoin.add(iter.next());
+ }
+ }
+ {
+ pigServer.registerQuery("C = JOIN A BY x, B BY x;");
+ Iterator<Tuple> iter = pigServer.openIterator("C");
+
+ while(iter.hasNext()) {
+ dbshj.add(iter.next());
+ }
+ }
+ Assert.assertEquals(dbMergeJoin.size(), dbshj.size());
+ Assert.assertEquals(true, TestHelper.compareBags(dbMergeJoin, dbshj));
+ }
+
+ @Test
public void testMergeJoin3Way() throws IOException{
try {
pigServer.registerQuery("A = LOAD '" + INPUT_FILE + "' as (id,
name, n);");
@@ -487,24 +514,6 @@ public class TestMergeJoin {
}
@Test
- public void testMergeFailWithOrderUDF() throws Exception{
- String query = "A = LOAD '" + INPUT_FILE + "' as (id, name, n);\n" +
- "B = LOAD '" + INPUT_FILE + "' as (id, name);\n" +
- "A = FOREACH A GENERATE LOWER($0) as id;\n" +
- "C = ORDER B by $0 parallel 5;\n" +
- "D = join A by id, C by id using 'merge';\n" +
- "store D into '/dev/null/1';";
- // verify that this fails parsing sanity checks.
- try {
- Util.buildPp(pigServer, query);
- } catch (Throwable t) {
- // expected to fail.
- return;
- }
- Assert.fail("Allowed a Merge Join despite a UDF");
- }
-
- @Test
public void testMergeJoinFailure2() throws IOException{
pigServer.registerQuery("A = LOAD '" + INPUT_FILE + "' as (id, name,
n);");
pigServer.registerQuery("B = LOAD '" + INPUT_FILE + "' as (id,
name);");