Author: knoguchi Date: Fri Nov 30 21:11:38 2018 New Revision: 1847856 URL: http://svn.apache.org/viewvc?rev=1847856&view=rev Log: PIG-5370: Union onschema + columnprune dropping used fields (knoguchi)
Modified: pig/trunk/CHANGES.txt pig/trunk/src/org/apache/pig/newplan/logical/relational/LOUnion.java pig/trunk/test/org/apache/pig/test/TestNewPlanColumnPrune.java Modified: pig/trunk/CHANGES.txt URL: http://svn.apache.org/viewvc/pig/trunk/CHANGES.txt?rev=1847856&r1=1847855&r2=1847856&view=diff ============================================================================== --- pig/trunk/CHANGES.txt (original) +++ pig/trunk/CHANGES.txt Fri Nov 30 21:11:38 2018 @@ -88,6 +88,8 @@ OPTIMIZATIONS BUG FIXES +PIG-5370: Union onschema + columnprune dropping used fields (knoguchi) + PIG-5362: Parameter substitution of shell cmd results doesn't handle backslash (wlauer via rohini) PIG-5355: Negative progress report by HBaseTableRecordReader (satishsaley via knoguchi) Modified: pig/trunk/src/org/apache/pig/newplan/logical/relational/LOUnion.java URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/newplan/logical/relational/LOUnion.java?rev=1847856&r1=1847855&r2=1847856&view=diff ============================================================================== --- pig/trunk/src/org/apache/pig/newplan/logical/relational/LOUnion.java (original) +++ pig/trunk/src/org/apache/pig/newplan/logical/relational/LOUnion.java Fri Nov 30 21:11:38 2018 @@ -34,9 +34,11 @@ import org.apache.pig.newplan.logical.re public class LOUnion extends LogicalRelationalOperator { private boolean onSchema; + + private static String UID_SEPARATOR = "_"; // uid mapping from output uid to input uid - private List<Pair<Long, Long>> uidMapping = new ArrayList<Pair<Long, Long>>(); + private List<Pair<Long, String>> uidMapping = new ArrayList<Pair<Long, String>>(); public LOUnion(OperatorPlan plan) { super("LOUnion", plan); @@ -108,7 +110,7 @@ public class LOUnion extends LogicalRela } // Bring back cached uid if any; otherwise, cache uid generated - setMergedSchemaUids(mergedSchema, inputSchemas); + setMergedSchemaUids(mergedSchema, inputSchemas, ""); return schema = mergedSchema; } @@ -145,7 +147,7 @@ public class LOUnion extends LogicalRela return mergedSchema; } - private void setMergedSchemaUids(LogicalSchema mergedSchema, List<LogicalSchema> inputSchemas) + private void setMergedSchemaUids(LogicalSchema mergedSchema, List<LogicalSchema> inputSchemas, String nested_uids) throws FrontendException { for (int i=0;i<mergedSchema.size();i++) { @@ -170,7 +172,7 @@ public class LOUnion extends LogicalRela } if (uid < 0) { - uid = getCachedOuputUid(inputFieldSchema.uid); + uid = getCachedOuputUid(createNestedUids(nested_uids,inputFieldSchema.uid)); if (uid >= 0 && outputFieldSchema.schema == null) break; } } @@ -186,13 +188,13 @@ public class LOUnion extends LogicalRela matchedInputFieldSchema = inputSchema.getFieldSubNameMatch(mergedSchema.getField(i).alias); if (matchedInputFieldSchema!=null) { inputUid = matchedInputFieldSchema.uid; - uidMapping.add(new Pair<Long, Long>(uid, inputUid)); + uidMapping.add(new Pair<Long, String>(uid, createNestedUids(nested_uids,inputUid))); } } else { matchedInputFieldSchema = mergedSchema.getField(i); inputUid = inputSchema.getField(i).uid; - uidMapping.add(new Pair<Long, Long>(uid, inputUid)); + uidMapping.add(new Pair<Long, String>(uid, createNestedUids(nested_uids,inputUid))); } } } @@ -201,16 +203,28 @@ public class LOUnion extends LogicalRela // This field has a schema. Assign uids to it as well if (outputFieldSchema.schema != null) { - setMergedSchemaUids(outputFieldSchema.schema, fieldInputSchemas); + setMergedSchemaUids(outputFieldSchema.schema, fieldInputSchemas, createNestedUids(nested_uids,outputFieldSchema.uid)); } } } - private long getCachedOuputUid(long inputUid) { + private String createNestedUids(String nested_uids, long new_uid) { + StringBuilder sb = new StringBuilder(nested_uids); + sb.append(UID_SEPARATOR); + sb.append(new_uid); + return sb.toString(); + } + + private long getLeafUid(String nested_uids) { + String [] uid_root_to_leaf = nested_uids.split(UID_SEPARATOR); + return Long.valueOf(uid_root_to_leaf[uid_root_to_leaf.length-1]); + } + + private long getCachedOuputUid(String nested_input_uids) { long uid = -1; - for (Pair<Long, Long> pair : uidMapping) { - if (pair.second==inputUid) { + for (Pair<Long, String> pair : uidMapping) { + if (pair.second.equals(nested_input_uids)) { uid = pair.first; break; } @@ -237,18 +251,18 @@ public class LOUnion extends LogicalRela } // Get input uids mapping to the output uid - public Set<Long> getInputUids(long uid) { + public Set<Long> getInputUids(long outputuid) { Set<Long> result = new HashSet<Long>(); - for (Pair<Long, Long> pair : uidMapping) { - if (pair.first==uid) - result.add(pair.second); + for (Pair<Long, String> pair : uidMapping) { + if (pair.first==outputuid) + result.add(getLeafUid(pair.second)); } return result; } @Override public void resetUid() { - uidMapping = new ArrayList<Pair<Long, Long>>(); + uidMapping = new ArrayList<Pair<Long, String>>(); } public List<Operator> getInputs() { Modified: pig/trunk/test/org/apache/pig/test/TestNewPlanColumnPrune.java URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestNewPlanColumnPrune.java?rev=1847856&r1=1847855&r2=1847856&view=diff ============================================================================== --- pig/trunk/test/org/apache/pig/test/TestNewPlanColumnPrune.java (original) +++ pig/trunk/test/org/apache/pig/test/TestNewPlanColumnPrune.java Fri Nov 30 21:11:38 2018 @@ -482,6 +482,35 @@ public class TestNewPlanColumnPrune { } } + @Test + public void testUnionOnschemaWithInnerBag() throws Exception { + // After handing inner-bag in Union-onschema, + // ColumnPrune broke due to overlapping uid inside the relation and + // ones inside the inner-bag (PIG-5370) + String query = "A0 = load 'd.txt' as (a0:int, a1:int, a2:int, a3:int);" + + "A = FOREACH A0 GENERATE a0, a1, a2;" + + "B = FOREACH (GROUP A by (a0,a1)) {" + + " A_FOREACH = FOREACH A GENERATE a1,a2;" + + " GENERATE A, FLATTEN(A_FOREACH) as (a1,a2);" + + "}" + + "C = load 'd2.txt' as (A:bag{tuple:(a0:int, a1:int, a2:int)}, a1:int,a2:int);" + + "Z = UNION ONSCHEMA B, C;" + + "store Z into 'empty';"; + + LogicalPlan newLogicalPlan = buildPlan(query); + + PlanOptimizer optimizer = new MyPlanOptimizer(newLogicalPlan, 3); + optimizer.optimize(); + System.err.println(newLogicalPlan); + Iterator<Operator> iter = newLogicalPlan.getOperators(); + while (iter.hasNext()) { + Operator o = iter.next(); + LogicalRelationalOperator lro = (LogicalRelationalOperator)o; + if (lro == null || lro.getAlias() == null) continue; + assertNotNull(lro.getSchema()); + } + } + public class MyPlanOptimizer extends LogicalPlanOptimizer { protected MyPlanOptimizer(OperatorPlan p, int iterations) { @@ -505,4 +534,3 @@ public class TestNewPlanColumnPrune { } } } -