Author: knoguchi Date: Wed Dec 27 15:55:23 2017 New Revision: 1819344 URL: http://svn.apache.org/viewvc?rev=1819344&view=rev Log: PIG-5312: Uids not set in inner schemas after UNION ONSCHEMA (tmwoodruff via knoguchi)
Modified: pig/trunk/CHANGES.txt pig/trunk/src/org/apache/pig/newplan/logical/relational/LOUnion.java pig/trunk/test/org/apache/pig/test/TestUnionOnSchema.java Modified: pig/trunk/CHANGES.txt URL: http://svn.apache.org/viewvc/pig/trunk/CHANGES.txt?rev=1819344&r1=1819343&r2=1819344&view=diff ============================================================================== --- pig/trunk/CHANGES.txt (original) +++ pig/trunk/CHANGES.txt Wed Dec 27 15:55:23 2017 @@ -59,6 +59,8 @@ PIG-5251: Bump joda-time to 2.9.9 (dbist OPTIMIZATIONS BUG FIXES +PIG-5312: Uids not set in inner schemas after UNION ONSCHEMA (tmwoodruff via knoguchi) + PIG-5300: hashCode for Bag needs to be order independent (knoguchi) PIG-5318: Unit test failures on Pig on Spark with Spark 2.2 (nkollar via szita) Modified: pig/trunk/src/org/apache/pig/newplan/logical/relational/LOUnion.java URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/newplan/logical/relational/LOUnion.java?rev=1819344&r1=1819343&r2=1819344&view=diff ============================================================================== --- pig/trunk/src/org/apache/pig/newplan/logical/relational/LOUnion.java (original) +++ pig/trunk/src/org/apache/pig/newplan/logical/relational/LOUnion.java Wed Dec 27 15:55:23 2017 @@ -108,11 +108,51 @@ public class LOUnion extends LogicalRela } // Bring back cached uid if any; otherwise, cache uid generated - for (int i=0;i<mergedSchema.size();i++) - { + setMergedSchemaUids(mergedSchema, inputSchemas); + + return schema = mergedSchema; + } + + /** + * create schema for union-onschema + */ + private LogicalSchema createMergedSchemaOnAlias(List<LogicalSchema> inputSchemas, + List<String> inputAliases) + throws FrontendException { + ArrayList<LogicalSchema> schemas = new ArrayList<LogicalSchema>(); + for (int i = 0; i < inputSchemas.size(); i++){ + LogicalSchema sch = inputSchemas.get(i); + for( LogicalFieldSchema fs : sch.getFields() ) { + if(fs.alias == null){ + String msg = "Schema of relation " + inputAliases.get(i) + + " has a null fieldschema for column(s). Schema :" + sch.toString(false); + throw new FrontendException( this, msg, 1116, PigException.INPUT ); + } + } + schemas.add( sch ); + } + + //create the merged schema + LogicalSchema mergedSchema = null; + try { + mergedSchema = LogicalSchema.mergeSchemasByAlias( schemas ); + } catch(FrontendException e) { + String msg = "Error merging schemas for union operator : " + + e.getMessage(); + throw new FrontendException(this, msg, 1116, PigException.INPUT, e); + } + + return mergedSchema; + } + + private void setMergedSchemaUids(LogicalSchema mergedSchema, List<LogicalSchema> inputSchemas) + throws FrontendException { + + for (int i=0;i<mergedSchema.size();i++) { LogicalSchema.LogicalFieldSchema outputFieldSchema = mergedSchema.getField(i); long uid = -1; + List<LogicalSchema> fieldInputSchemas = new ArrayList<>(inputSchemas.size()); // Search all the cached uid mappings by input field to see if // we've cached an output uid for this output field @@ -125,8 +165,14 @@ public class LOUnion extends LogicalRela } if (inputFieldSchema != null) { - uid = getCachedOuputUid(inputFieldSchema.uid); - if (uid >= 0) break; + if (inputFieldSchema.schema != null) { + fieldInputSchemas.add(inputFieldSchema.schema); + } + + if (uid < 0) { + uid = getCachedOuputUid(inputFieldSchema.uid); + if (uid >= 0 && outputFieldSchema.schema == null) break; + } } } @@ -136,8 +182,8 @@ public class LOUnion extends LogicalRela for (LogicalSchema inputSchema : inputSchemas) { long inputUid; LogicalFieldSchema matchedInputFieldSchema; - if (onSchema) { - matchedInputFieldSchema = inputSchema.getFieldSubNameMatch(mergedSchema.getField(i).alias); + if (onSchema) { + matchedInputFieldSchema = inputSchema.getFieldSubNameMatch(mergedSchema.getField(i).alias); if (matchedInputFieldSchema!=null) { inputUid = matchedInputFieldSchema.uid; uidMapping.add(new Pair<Long, Long>(uid, inputUid)); @@ -145,50 +191,21 @@ public class LOUnion extends LogicalRela } else { matchedInputFieldSchema = mergedSchema.getField(i); - inputUid = inputSchema.getField(i).uid; - uidMapping.add(new Pair<Long, Long>(uid, inputUid)); + inputUid = inputSchema.getField(i).uid; + uidMapping.add(new Pair<Long, Long>(uid, inputUid)); } } } outputFieldSchema.uid = uid; - } - - return schema = mergedSchema; - } - /** - * create schema for union-onschema - */ - private LogicalSchema createMergedSchemaOnAlias(List<LogicalSchema> inputSchemas, - List<String> inputAliases) - throws FrontendException { - ArrayList<LogicalSchema> schemas = new ArrayList<LogicalSchema>(); - for (int i = 0; i < inputSchemas.size(); i++){ - LogicalSchema sch = inputSchemas.get(i); - for( LogicalFieldSchema fs : sch.getFields() ) { - if(fs.alias == null){ - String msg = "Schema of relation " + inputAliases.get(i) - + " has a null fieldschema for column(s). Schema :" + sch.toString(false); - throw new FrontendException( this, msg, 1116, PigException.INPUT ); - } + // This field has a schema. Assign uids to it as well + if (outputFieldSchema.schema != null) { + setMergedSchemaUids(outputFieldSchema.schema, fieldInputSchemas); } - schemas.add( sch ); } - - //create the merged schema - LogicalSchema mergedSchema = null; - try { - mergedSchema = LogicalSchema.mergeSchemasByAlias( schemas ); - } catch(FrontendException e) { - String msg = "Error merging schemas for union operator : " - + e.getMessage(); - throw new FrontendException(this, msg, 1116, PigException.INPUT, e); - } - - return mergedSchema; } - + private long getCachedOuputUid(long inputUid) { long uid = -1; Modified: pig/trunk/test/org/apache/pig/test/TestUnionOnSchema.java URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestUnionOnSchema.java?rev=1819344&r1=1819343&r2=1819344&view=diff ============================================================================== --- pig/trunk/test/org/apache/pig/test/TestUnionOnSchema.java (original) +++ pig/trunk/test/org/apache/pig/test/TestUnionOnSchema.java Wed Dec 27 15:55:23 2017 @@ -478,6 +478,43 @@ public class TestUnionOnSchema { * Test UNION ONSCHEMA on 3 inputs */ @Test + public void testUnionOnSchemaInnerSchema() throws Exception { + PigServer pig = new PigServer(Util.getLocalTestMode()); + String query = + " l1 = load '" + INP_FILE_2NUM_1CHAR_1BAG + "' as " + + " (i : long, c : chararray, j : int " + + ", b : bag { t : tuple (c1 : int, c2 : chararray)} ); " + + "l2 = load '" + INP_FILE_2NUM_1CHAR_1BAG + "' as " + + " (i : long, c : chararray, j : int " + + ", b : bag { t : tuple (c1 : int, c2 : chararray)} ); " + + "u = union onschema l1, l2; " + // The addition in the inner foreach will fail if the inner schema's uids + // are all set to -1, since the code that finds the inner load's schema will + // match the last item in b's schema, which is a chararray + + "p = foreach u { x = foreach b GENERATE c1 + 5 as c3; GENERATE i, c, x; }"; + + Util.registerMultiLineQuery(pig, query); + pig.explain("p", System.out); + + Iterator<Tuple> it = pig.openIterator("p"); + + List<Tuple> expectedRes = + Util.getTuplesFromConstantTupleStrings( + new String[] { + "(1L,'abc',{(6),(6)})", + "(5L,'def',{(7),(7)})", + "(1L,'abc',{(6),(6)})", + "(5L,'def',{(7),(7)})" + }); + Util.checkQueryOutputsAfterSort(it, expectedRes); + } + + /** + * Test UNION ONSCHEMA on 3 inputs + * @throws IOException + * @throws ParserException + */ + @Test public void testUnionOnSchema3Inputs() throws Exception { PigServer pig = new PigServer(Util.getLocalTestMode()); String query =