Author: knoguchi
Date: Wed Dec 27 15:55:23 2017
New Revision: 1819344

URL: http://svn.apache.org/viewvc?rev=1819344&view=rev
Log:
PIG-5312: Uids not set in inner schemas after UNION ONSCHEMA (tmwoodruff via 
knoguchi)

Modified:
    pig/trunk/CHANGES.txt
    pig/trunk/src/org/apache/pig/newplan/logical/relational/LOUnion.java
    pig/trunk/test/org/apache/pig/test/TestUnionOnSchema.java

Modified: pig/trunk/CHANGES.txt
URL: 
http://svn.apache.org/viewvc/pig/trunk/CHANGES.txt?rev=1819344&r1=1819343&r2=1819344&view=diff
==============================================================================
--- pig/trunk/CHANGES.txt (original)
+++ pig/trunk/CHANGES.txt Wed Dec 27 15:55:23 2017
@@ -59,6 +59,8 @@ PIG-5251: Bump joda-time to 2.9.9 (dbist
 OPTIMIZATIONS
  
 BUG FIXES
+PIG-5312: Uids not set in inner schemas after UNION ONSCHEMA (tmwoodruff via 
knoguchi)
+
 PIG-5300: hashCode for Bag needs to be order independent (knoguchi)
 
 PIG-5318: Unit test failures on Pig on Spark with Spark 2.2 (nkollar via szita)

Modified: pig/trunk/src/org/apache/pig/newplan/logical/relational/LOUnion.java
URL: 
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/newplan/logical/relational/LOUnion.java?rev=1819344&r1=1819343&r2=1819344&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/newplan/logical/relational/LOUnion.java 
(original)
+++ pig/trunk/src/org/apache/pig/newplan/logical/relational/LOUnion.java Wed 
Dec 27 15:55:23 2017
@@ -108,11 +108,51 @@ public class LOUnion extends LogicalRela
         }
 
         // Bring back cached uid if any; otherwise, cache uid generated
-        for (int i=0;i<mergedSchema.size();i++)
-        {
+        setMergedSchemaUids(mergedSchema, inputSchemas);
+
+        return schema = mergedSchema;
+    }
+
+    /**
+     * create schema for union-onschema
+     */
+    private LogicalSchema createMergedSchemaOnAlias(List<LogicalSchema> 
inputSchemas,
+            List<String> inputAliases)
+    throws FrontendException {
+        ArrayList<LogicalSchema> schemas = new ArrayList<LogicalSchema>();
+        for (int i = 0; i < inputSchemas.size(); i++){
+            LogicalSchema sch = inputSchemas.get(i);
+            for( LogicalFieldSchema fs : sch.getFields() ) {
+                if(fs.alias == null){
+                    String msg = "Schema of relation " + inputAliases.get(i)
+                        + " has a null fieldschema for column(s). Schema :" + 
sch.toString(false);
+                    throw new FrontendException( this, msg, 1116, 
PigException.INPUT );
+                }
+            }
+            schemas.add( sch );
+        }
+
+        //create the merged schema
+        LogicalSchema mergedSchema = null;
+        try {
+            mergedSchema = LogicalSchema.mergeSchemasByAlias( schemas );
+        } catch(FrontendException e)                 {
+            String msg = "Error merging schemas for union operator : "
+                + e.getMessage();
+            throw new FrontendException(this, msg, 1116, PigException.INPUT, 
e);
+        }
+
+        return mergedSchema;
+    }
+
+    private void setMergedSchemaUids(LogicalSchema mergedSchema, 
List<LogicalSchema> inputSchemas)
+    throws FrontendException {
+
+        for (int i=0;i<mergedSchema.size();i++) {
             LogicalSchema.LogicalFieldSchema outputFieldSchema = 
mergedSchema.getField(i);
 
             long uid = -1;
+            List<LogicalSchema> fieldInputSchemas = new 
ArrayList<>(inputSchemas.size());
             
             // Search all the cached uid mappings by input field to see if 
             // we've cached an output uid for this output field
@@ -125,8 +165,14 @@ public class LOUnion extends LogicalRela
                 }
                 
                 if (inputFieldSchema != null) {
-                    uid = getCachedOuputUid(inputFieldSchema.uid);
-                    if (uid >= 0) break;
+                    if (inputFieldSchema.schema != null) {
+                        fieldInputSchemas.add(inputFieldSchema.schema);
+                    }
+
+                    if (uid < 0) {
+                        uid = getCachedOuputUid(inputFieldSchema.uid);
+                        if (uid >= 0 && outputFieldSchema.schema == null) 
break;
+                    }
                 }
             }
             
@@ -136,8 +182,8 @@ public class LOUnion extends LogicalRela
                 for (LogicalSchema inputSchema : inputSchemas) {
                     long inputUid;
                     LogicalFieldSchema matchedInputFieldSchema;
-                       if (onSchema) {
-                           matchedInputFieldSchema = 
inputSchema.getFieldSubNameMatch(mergedSchema.getField(i).alias);
+                    if (onSchema) {
+                        matchedInputFieldSchema = 
inputSchema.getFieldSubNameMatch(mergedSchema.getField(i).alias);
                         if (matchedInputFieldSchema!=null) {
                             inputUid = matchedInputFieldSchema.uid;
                             uidMapping.add(new Pair<Long, Long>(uid, 
inputUid));
@@ -145,50 +191,21 @@ public class LOUnion extends LogicalRela
                     }
                     else {
                         matchedInputFieldSchema = mergedSchema.getField(i);
-                               inputUid = inputSchema.getField(i).uid;
-                               uidMapping.add(new Pair<Long, Long>(uid, 
inputUid));
+                        inputUid = inputSchema.getField(i).uid;
+                        uidMapping.add(new Pair<Long, Long>(uid, inputUid));
                     }
                 }
             }
 
             outputFieldSchema.uid = uid;
-        }
-        
-        return schema = mergedSchema;
-    }
 
-    /**
-     * create schema for union-onschema
-     */
-    private LogicalSchema createMergedSchemaOnAlias(List<LogicalSchema> 
inputSchemas, 
-            List<String> inputAliases) 
-    throws FrontendException {
-        ArrayList<LogicalSchema> schemas = new ArrayList<LogicalSchema>();
-        for (int i = 0; i < inputSchemas.size(); i++){
-            LogicalSchema sch = inputSchemas.get(i);
-            for( LogicalFieldSchema fs : sch.getFields() ) {
-                if(fs.alias == null){
-                    String msg = "Schema of relation " + inputAliases.get(i)
-                        + " has a null fieldschema for column(s). Schema :" + 
sch.toString(false);
-                    throw new FrontendException( this, msg, 1116, 
PigException.INPUT );
-                }
+            // This field has a schema. Assign uids to it as well
+            if (outputFieldSchema.schema != null) {
+                setMergedSchemaUids(outputFieldSchema.schema, 
fieldInputSchemas);
             }
-            schemas.add( sch );
         }
-        
-        //create the merged schema
-        LogicalSchema mergedSchema = null;
-        try {
-            mergedSchema = LogicalSchema.mergeSchemasByAlias( schemas );   
-        } catch(FrontendException e)                 {
-            String msg = "Error merging schemas for union operator : "
-                + e.getMessage();
-            throw new FrontendException(this, msg, 1116, PigException.INPUT, 
e);
-        }
-        
-        return mergedSchema;
     }
-    
+
     private long getCachedOuputUid(long inputUid) {
         long uid = -1;
         

Modified: pig/trunk/test/org/apache/pig/test/TestUnionOnSchema.java
URL: 
http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestUnionOnSchema.java?rev=1819344&r1=1819343&r2=1819344&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/test/TestUnionOnSchema.java (original)
+++ pig/trunk/test/org/apache/pig/test/TestUnionOnSchema.java Wed Dec 27 
15:55:23 2017
@@ -478,6 +478,43 @@ public class TestUnionOnSchema  {
      * Test UNION ONSCHEMA on 3 inputs 
      */
     @Test
+    public void testUnionOnSchemaInnerSchema() throws Exception {
+        PigServer pig = new PigServer(Util.getLocalTestMode());
+        String query =
+            "  l1 = load '" + INP_FILE_2NUM_1CHAR_1BAG + "' as "
+            + "  (i : long, c : chararray, j : int "
+            +       ", b : bag { t : tuple (c1 : int, c2 : chararray)} ); "
+            + "l2 = load '" + INP_FILE_2NUM_1CHAR_1BAG + "' as "
+            + "  (i : long, c : chararray, j : int "
+            +       ", b : bag { t : tuple (c1 : int, c2 : chararray)} ); "
+            + "u = union onschema l1, l2; "
+            // The addition in the inner foreach will fail if the inner 
schema's uids
+            // are all set to -1, since the code that finds the inner load's 
schema will
+            // match the last item in b's schema, which is a chararray
+            + "p = foreach u { x = foreach b GENERATE c1 + 5 as c3; GENERATE 
i, c, x; }";
+
+        Util.registerMultiLineQuery(pig, query);
+        pig.explain("p", System.out);
+
+        Iterator<Tuple> it = pig.openIterator("p");
+
+        List<Tuple> expectedRes =
+            Util.getTuplesFromConstantTupleStrings(
+                    new String[] {
+                        "(1L,'abc',{(6),(6)})",
+                        "(5L,'def',{(7),(7)})",
+                        "(1L,'abc',{(6),(6)})",
+                        "(5L,'def',{(7),(7)})"
+                    });
+        Util.checkQueryOutputsAfterSort(it, expectedRes);
+    }
+
+    /**
+     * Test UNION ONSCHEMA on 3 inputs
+     * @throws IOException
+     * @throws ParserException
+     */
+    @Test
     public void testUnionOnSchema3Inputs() throws Exception {
         PigServer pig = new PigServer(Util.getLocalTestMode());
         String query =


Reply via email to