Author: daijy
Date: Wed Jan 21 21:53:42 2015
New Revision: 1653670

URL: http://svn.apache.org/r1653670
Log:
PIG-4352: Port local mode tests to Tez - TestUnionOnSchema

Modified:
    pig/trunk/CHANGES.txt
    
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezOperator.java
    
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/UnionOptimizer.java
    pig/trunk/test/org/apache/pig/test/TestUnionOnSchema.java
    pig/trunk/test/tez-local-tests

Modified: pig/trunk/CHANGES.txt
URL: 
http://svn.apache.org/viewvc/pig/trunk/CHANGES.txt?rev=1653670&r1=1653669&r2=1653670&view=diff
==============================================================================
--- pig/trunk/CHANGES.txt (original)
+++ pig/trunk/CHANGES.txt Wed Jan 21 21:53:42 2015
@@ -44,6 +44,8 @@ PIG-4333: Split BigData tests into multi
  
 BUG FIXES
 
+PIG-4352: Port local mode tests to Tez - TestUnionOnSchema (daijy)
+
 PIG-4359: Port local mode tests to Tez - part4 (daijy)
 
 PIG-4340: PigStorage fails parsing empty map (daijy)

Modified: 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezOperator.java
URL: 
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezOperator.java?rev=1653670&r1=1653669&r2=1653670&view=diff
==============================================================================
--- 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezOperator.java
 (original)
+++ 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezOperator.java
 Wed Jan 21 21:53:42 2015
@@ -608,6 +608,7 @@ public class TezOperator extends Operato
         private POStore store;
         private OutputDescriptor storeOutDescriptor;
         private VertexGroup vertexGroup;
+        private FileSpec sFile;
 
         public VertexGroupInfo() {
         }
@@ -659,6 +660,13 @@ public class TezOperator extends Operato
             this.vertexGroup = vertexGroup;
         }
 
+        public void setSFile(FileSpec sFile) {
+            this.sFile = sFile;
+        }
+
+        public FileSpec getSFile() {
+            return sFile;
+        }
     }
 }
 

Modified: 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/UnionOptimizer.java
URL: 
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/UnionOptimizer.java?rev=1653670&r1=1653669&r2=1653670&view=diff
==============================================================================
--- 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/UnionOptimizer.java
 (original)
+++ 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/UnionOptimizer.java
 Wed Jan 21 21:53:42 2015
@@ -97,11 +97,28 @@ public class UnionOptimizer extends TezO
         // Union followed by Split followed by Store could have multiple stores
         List<POStoreTez> unionStoreOutputs = 
PlanHelper.getPhysicalOperators(unionOpPlan, POStoreTez.class);
         TezOperator[] storeVertexGroupOps = new 
TezOperator[unionStoreOutputs.size()];
+        List<TezOperator> succs = tezPlan.getSuccessors(unionOp);
+        // Create a copy as disconnect while iterating modifies the original 
list
+        List<TezOperator> successors = succs == null ? null : new 
ArrayList<TezOperator>(succs);
+        
         for (int i=0; i < storeVertexGroupOps.length; i++) {
-            storeVertexGroupOps[i] = new 
TezOperator(OperatorKey.genOpKey(scope));
-            storeVertexGroupOps[i].setVertexGroupInfo(new 
VertexGroupInfo(unionStoreOutputs.get(i)));
-            
storeVertexGroupOps[i].setVertexGroupMembers(unionOp.getVertexGroupMembers());
-            tezPlan.add(storeVertexGroupOps[i]);
+            TezOperator existingVertexGroup = null;
+            if (successors != null) {
+                for (TezOperator succ : successors) {
+                    if (succ.isVertexGroup() && 
succ.getVertexGroupInfo().getSFile().equals(unionStoreOutputs.get(i).getSFile()))
 {
+                        existingVertexGroup = succ;
+                    }
+                }
+            }
+            if (existingVertexGroup != null) {
+                storeVertexGroupOps[i] = existingVertexGroup;
+            } else {
+                storeVertexGroupOps[i] = new 
TezOperator(OperatorKey.genOpKey(scope));
+                storeVertexGroupOps[i].setVertexGroupInfo(new 
VertexGroupInfo(unionStoreOutputs.get(i)));
+                
storeVertexGroupOps[i].getVertexGroupInfo().setSFile(unionStoreOutputs.get(i).getSFile());
+                
storeVertexGroupOps[i].setVertexGroupMembers(unionOp.getVertexGroupMembers());
+                tezPlan.add(storeVertexGroupOps[i]);
+            }
         }
 
         // Case of split, orderby, skewed join, rank, etc will have multiple 
outputs
@@ -182,7 +199,6 @@ public class UnionOptimizer extends TezO
                 tezPlan.disconnect(pred, unionOp);
             }
 
-            List<TezOperator> successors = tezPlan.getSuccessors(unionOp);
             List<TezOutput> valueOnlyOutputs = new ArrayList<TezOutput>();
             for (TezOutput tezOutput : unionOutputs) {
                 if (tezOutput instanceof POValueOutputTez) {
@@ -243,9 +259,6 @@ public class UnionOptimizer extends TezO
             throw new VisitorException(e);
         }
 
-        List<TezOperator> succs = tezPlan.getSuccessors(unionOp);
-        // Create a copy as disconnect while iterating modifies the original 
list
-        List<TezOperator> successors = succs == null ? null : new 
ArrayList<TezOperator>(succs);
         if (successors != null) {
             // Successor inputs should now point to the vertex groups.
             for (TezOperator succ : successors) {

Modified: pig/trunk/test/org/apache/pig/test/TestUnionOnSchema.java
URL: 
http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestUnionOnSchema.java?rev=1653670&r1=1653669&r2=1653670&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/test/TestUnionOnSchema.java (original)
+++ pig/trunk/test/org/apache/pig/test/TestUnionOnSchema.java Wed Jan 21 
21:53:42 2015
@@ -28,7 +28,6 @@ import java.util.List;
 import junit.framework.Assert;
 
 import org.apache.pig.EvalFunc;
-import org.apache.pig.ExecType;
 import org.apache.pig.PigException;
 import org.apache.pig.PigServer;
 import org.apache.pig.builtin.mock.Storage;
@@ -101,8 +100,8 @@ public class TestUnionOnSchema  {
      * @throws ParserException
      */
     @Test
-    public void testUnionOnSchemaSameSchema() throws IOException, 
ParserException {
-        PigServer pig = new PigServer(ExecType.LOCAL);
+    public void testUnionOnSchemaSameSchema() throws Exception {
+        PigServer pig = new PigServer(Util.getLocalTestMode());
         String query =
             "  l1 = load '" + INP_FILE_2NUMS + "' as (i : int, j : int);"
             + "l2 = load '" + INP_FILE_2NUMS + "' as (i : int, j : int);"
@@ -133,8 +132,8 @@ public class TestUnionOnSchema  {
      * @throws ParserException
      */
     @Test
-    public void testUnionOnSchemaFilter() throws IOException, ParserException {
-        PigServer pig = new PigServer(ExecType.LOCAL);
+    public void testUnionOnSchemaFilter() throws Exception {
+        PigServer pig = new PigServer(Util.getLocalTestMode());
         String query =
             "  l1 = load '" + INP_FILE_2NUMS + "' as (i : int, x : int);"
             + "l2 = load '" + INP_FILE_2NUMS + "' as (i : int, j : int);"
@@ -166,8 +165,8 @@ public class TestUnionOnSchema  {
      * @throws ParserException
      */
     @Test
-    public void testUnionOnSchemaSuccOps() throws IOException, ParserException 
{
-        PigServer pig = new PigServer(ExecType.LOCAL);
+    public void testUnionOnSchemaSuccOps() throws Exception {
+        PigServer pig = new PigServer(Util.getLocalTestMode());
         String query =
             "  l1 = load '" + INP_FILE_2NUMS + "' as (i : int);"
             + "l2 = load '" + INP_FILE_2NUMS + "' as (x : int, y : int);"
@@ -199,8 +198,8 @@ public class TestUnionOnSchema  {
      * @throws ParserException
      */
     @Test
-    public void testUnionOnSchemaCastOnByteArray() throws IOException, 
ParserException {
-        PigServer pig = new PigServer(ExecType.LOCAL);
+    public void testUnionOnSchemaCastOnByteArray() throws Exception {
+        PigServer pig = new PigServer(Util.getLocalTestMode());
         String query =
             "  l1 = load '" + INP_FILE_2NUMS + "' as (i, j);"
             + " f1 = foreach l1 generate (int)i, (int)j;"
@@ -228,8 +227,8 @@ public class TestUnionOnSchema  {
      * @throws ParserException
      */
     @Test
-    public void testUnionOnSchemaScopedColumnName() throws IOException, 
ParserException {
-        PigServer pig = new PigServer(ExecType.LOCAL);
+    public void testUnionOnSchemaScopedColumnName() throws Exception {
+        PigServer pig = new PigServer(Util.getLocalTestMode());
         String query_prefix = 
         "  l1 = load '" + INP_FILE_2NUMS + "' as (i : int, j : int); " 
         + "g = group l1 by i; "
@@ -271,8 +270,8 @@ public class TestUnionOnSchema  {
      * @throws ParserException
      */
     @Test
-    public void testUnionOnSchemaScopedColumnNameBothInp1() throws 
IOException, ParserException {
-        PigServer pig = new PigServer(ExecType.LOCAL);
+    public void testUnionOnSchemaScopedColumnNameBothInp1() throws Exception {
+        PigServer pig = new PigServer(Util.getLocalTestMode());
         String query = 
         "  l1 = load '" + INP_FILE_2NUMS + "' as (i : int, j : int); " 
         + "g1 = group l1 by i; "
@@ -307,8 +306,8 @@ public class TestUnionOnSchema  {
      * @throws ParserException
      */
     @Test
-    public void testUnionOnSchemaScopedColumnNameBothInp2() throws 
IOException, ParserException {
-        PigServer pig = new PigServer(ExecType.LOCAL);
+    public void testUnionOnSchemaScopedColumnNameBothInp2() throws Exception {
+        PigServer pig = new PigServer(Util.getLocalTestMode());
         String query =
             "   l1 = load '" + INP_FILE_2NUMS + "' as (i : int, j : int); " 
             + " l2 = load '" + INP_FILE_2NUMS + "' as (i : int, x : 
chararray); " 
@@ -345,7 +344,7 @@ public class TestUnionOnSchema  {
      * @throws ParserException
      */
     @Test
-    public void testUnionOnSchemaScopedColumnNameNeg() throws IOException, 
ParserException {
+    public void testUnionOnSchemaScopedColumnNameNeg() throws Exception {
         
         String expectedErr = "Found more than one match: l1::i, l2::i";
         String query_prefix =
@@ -371,8 +370,8 @@ public class TestUnionOnSchema  {
      * @throws ParserException
      */
     @Test
-    public void testUnionOnSchemaDiffNumType() throws IOException, 
ParserException {
-        PigServer pig = new PigServer(ExecType.LOCAL);
+    public void testUnionOnSchemaDiffNumType() throws Exception {
+        PigServer pig = new PigServer(Util.getLocalTestMode());
         String query =
             "  l1 = load '" + INP_FILE_2NUMS + "' as (i : int, j : double);"
             + "l2 = load '" + INP_FILE_2NUMS + "' as (i : long, j : float);"
@@ -401,8 +400,8 @@ public class TestUnionOnSchema  {
      * @throws ParserException
      */
     @Test
-    public void testUnionOnSchemaNoCommonCols() throws IOException, 
ParserException {
-        PigServer pig = new PigServer(ExecType.LOCAL);
+    public void testUnionOnSchemaNoCommonCols() throws Exception {
+        PigServer pig = new PigServer(Util.getLocalTestMode());
         String query =
             "  l1 = load '" + INP_FILE_2NUMS + "' as (i : int, j : int);"
             + "l2 = load '" + INP_FILE_2NUMS + "' as (x : long, y : float);"
@@ -429,8 +428,8 @@ public class TestUnionOnSchema  {
      * @throws ParserException
      */
     @Test
-    public void testUnionOnSchemaAdditionalColumn() throws IOException, 
ParserException {
-        PigServer pig = new PigServer(ExecType.LOCAL);
+    public void testUnionOnSchemaAdditionalColumn() throws Exception {
+        PigServer pig = new PigServer(Util.getLocalTestMode());
         String query =
             "  l1 = load '" + INP_FILE_2NUMS + "' as (i : int, j : int);"
             + "l2 = load '" + INP_FILE_2NUM_1CHAR_1BAG + "' as " 
@@ -458,8 +457,8 @@ public class TestUnionOnSchema  {
     }
     
     @Test
-    public void testUnionOnSchemaAdditionalColumnsWithImplicitSplit() throws 
IOException {
-        PigServer pig = new PigServer(ExecType.LOCAL);
+    public void testUnionOnSchemaAdditionalColumnsWithImplicitSplit() throws 
Exception {
+        PigServer pig = new PigServer(Util.getLocalTestMode());
         Data data = Storage.resetData(pig);
         
         // Use batch to force multiple outputs from relation l3. This causes 
@@ -503,8 +502,8 @@ public class TestUnionOnSchema  {
      * @throws ParserException
      */
     @Test
-    public void testUnionOnSchema3Inputs() throws IOException, ParserException 
{
-        PigServer pig = new PigServer(ExecType.LOCAL);
+    public void testUnionOnSchema3Inputs() throws Exception {
+        PigServer pig = new PigServer(Util.getLocalTestMode());
         String query =
             "  l1 = load '" + INP_FILE_2NUMS + "' as (i : int, j : int); "
             + "l2 = load '" + INP_FILE_2NUMS + "' as (i : double, x : int); "  
          
@@ -538,8 +537,8 @@ public class TestUnionOnSchema  {
      * @throws ParserException
      */
     @Test
-    public void testUnionOnSchemaByteArrayConversions() throws IOException, 
ParserException {
-        PigServer pig = new PigServer(ExecType.LOCAL);
+    public void testUnionOnSchemaByteArrayConversions() throws Exception {
+        PigServer pig = new PigServer(Util.getLocalTestMode());
         String query =
             " l1 = load '" + INP_FILE_2NUM_1CHAR_1BAG + "' as " 
             + "  (i : bytearray, x : bytearray, j : bytearray " 
@@ -577,7 +576,7 @@ public class TestUnionOnSchema  {
      * @throws ParserException
      */
     @Test
-    public void testUnionOnSchemaNoSchema() throws IOException, 
ParserException {
+    public void testUnionOnSchemaNoSchema() throws Exception {
         String expectedErr = "UNION ONSCHEMA cannot be used with " +
         "relations that have null schema";
         String query =
@@ -602,7 +601,7 @@ public class TestUnionOnSchema  {
      * @throws ParserException
      */
     @Test
-    public void testUnionOnSchemaNullAliasInFieldSchema() throws IOException, 
ParserException {
+    public void testUnionOnSchemaNullAliasInFieldSchema() throws Exception {
         String expectedErr = "Schema of relation f has a null fieldschema for 
" +
                        "column(s). Schema ::long,y:float";
         String query =
@@ -615,8 +614,8 @@ public class TestUnionOnSchema  {
     }
 
 
-    private void checkSchemaEx(String query, String expectedErr) throws 
IOException {
-        PigServer pig = new PigServer(ExecType.LOCAL);
+    private void checkSchemaEx(String query, String expectedErr) throws 
Exception {
+        PigServer pig = new PigServer(Util.getLocalTestMode());
 
         boolean foundEx = false;
         try{
@@ -645,7 +644,7 @@ public class TestUnionOnSchema  {
      * @throws ParserException
      */
     @Test
-    public void testUnionOnSchemaIncompatibleTypes() throws IOException, 
ParserException {
+    public void testUnionOnSchemaIncompatibleTypes() throws Exception {
         String query =
             "  l1 = load '" + INP_FILE_2NUMS + "' as (x : long, y : 
chararray);"
             + "l2 = load '" + INP_FILE_2NUMS + "' as (x : long, y : float);"
@@ -693,15 +692,15 @@ public class TestUnionOnSchema  {
     }
     
     
-    private void checkSchemaEquals(String query, Schema expectedSch) throws 
IOException {
-        PigServer pig = new PigServer(ExecType.LOCAL);
+    private void checkSchemaEquals(String query, Schema expectedSch) throws 
Exception {
+        PigServer pig = new PigServer(Util.getLocalTestMode());
         Util.registerMultiLineQuery(pig, query);
         Schema sch = pig.dumpSchema("u");
         assertEquals("Checking expected schema", expectedSch, sch);      
     }
 
 
-    private void checkSchemaEquals(String query, String schemaStr) throws 
IOException, ParserException {
+    private void checkSchemaEquals(String query, String schemaStr) throws 
Exception {
         Schema expectedSch = Utils.getSchemaFromString(schemaStr);
         checkSchemaEquals(query, expectedSch);       
     }
@@ -713,8 +712,8 @@ public class TestUnionOnSchema  {
      * @throws ParserException
      */
     @Test
-    public void testUnionOnSchemaInputUdfs() throws IOException, 
ParserException {
-        PigServer pig = new PigServer(ExecType.LOCAL);
+    public void testUnionOnSchemaInputUdfs() throws Exception {
+        PigServer pig = new PigServer(Util.getLocalTestMode());
         String query =
             "  l1 = load '" + INP_FILE_2NUMS + "' as (i : int, j : chararray);"
             + "l2 = load '" + INP_FILE_2NUMS + "' as (i : int, j : chararray);"
@@ -750,8 +749,8 @@ public class TestUnionOnSchema  {
      * @throws ParserException
      */
     @Test
-    public void testUnionOnSchemaUdfTypeEvolution() throws IOException, 
ParserException {
-        PigServer pig = new PigServer(ExecType.LOCAL);
+    public void testUnionOnSchemaUdfTypeEvolution() throws Exception {
+        PigServer pig = new PigServer(Util.getLocalTestMode());
         String query_prefix =
             "  l1 = load '" + INP_FILE_2NUM_1CHAR_1BAG + "' as " 
             + "  (i : int, c : chararray, j : int " 
@@ -802,8 +801,8 @@ public class TestUnionOnSchema  {
      * @throws ParserException
      */
     @Test
-    public void testUnionOnSchemaUdfTypeEvolution2() throws IOException, 
ParserException {
-        PigServer pig = new PigServer(ExecType.LOCAL);
+    public void testUnionOnSchemaUdfTypeEvolution2() throws Exception {
+        PigServer pig = new PigServer(Util.getLocalTestMode());
         String query_prefix =
             "  l1 = load '" + INP_FILE_2NUM_1CHAR_1BAG + "' as " 
             + "  (i : int, c : chararray, j : int " 
@@ -874,8 +873,8 @@ public class TestUnionOnSchema  {
      * @throws ParserException
      */
     @Test
-    public void testUnionOnSchemaScopeMulti() throws IOException, 
ParserException {
-        PigServer pig = new PigServer(ExecType.LOCAL);
+    public void testUnionOnSchemaScopeMulti() throws Exception {
+        PigServer pig = new PigServer(Util.getLocalTestMode());
         String query_prefix =
             "  a = load '" + INP_FILE_2NUMS+ "' as (i:int, j:int); "
             + "b = group a by i; "
@@ -921,8 +920,8 @@ public class TestUnionOnSchema  {
      * @throws ParserException
      */
     @Test
-    public void testTwoUnions() throws IOException, ParserException {
-        PigServer pig = new PigServer(ExecType.LOCAL);
+    public void testTwoUnions() throws Exception {
+        PigServer pig = new PigServer(Util.getLocalTestMode());
         String query =
             "  l1 = load '" + INP_FILE_2NUMS + "' as (i : int, j : int);"
             + "l2 = load '" + INP_FILE_2NUMS + "' as (i : long, j : int);"

Modified: pig/trunk/test/tez-local-tests
URL: 
http://svn.apache.org/viewvc/pig/trunk/test/tez-local-tests?rev=1653670&r1=1653669&r2=1653670&view=diff
==============================================================================
--- pig/trunk/test/tez-local-tests (original)
+++ pig/trunk/test/tez-local-tests Wed Jan 21 21:53:42 2015
@@ -83,3 +83,4 @@
 **/TestStoreLocal.java
 **/TestPOPartialAggPlanTez.java
 **/TestMultiQueryLocal.java
+**/TestUnionOnSchema.java


Reply via email to