> On Dec. 16, 2015, 6:57 a.m., kelly zhang wrote: > > src/org/apache/pig/backend/hadoop/executionengine/spark/converter/ReduceByConverter.java, > > line 195 > > <https://reviews.apache.org/r/40743/diff/2/?file=1155796#file1155796line195> > > > > why not adding following code: > > // Ensure output is consistent with the output of KeyValueFunction > > result.append(t.get(0)); > > Tuple valueTuple = tf.newTuple(); > > for (Object o : ((Tuple) r.result).getAll()) { > > if (!o.equals(key)) { > > valueTuple.append(o); > > } > > } > > result.append(valueTuple); > > > > I think returning r is ok.
If we return r, the result will be this: (ABC,(2),(3)) - A tuple with key followed by values. if we return result, it will look like this: (ABC,((2),(3))) - A tuple with key and a value tuple (containing values). The latter is what we want and hence the jugglery there. I can add a comment in the code to make this clear. - Pallavi ----------------------------------------------------------- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/40743/#review109469 ----------------------------------------------------------- On Dec. 9, 2015, 5:49 a.m., Pallavi Rao wrote: > > ----------------------------------------------------------- > This is an automatically generated e-mail. To reply, visit: > https://reviews.apache.org/r/40743/ > ----------------------------------------------------------- > > (Updated Dec. 9, 2015, 5:49 a.m.) > > > Review request for pig, Mohit Sabharwal and Xuefu Zhang. > > > Bugs: PIG-4709 > https://issues.apache.org/jira/browse/PIG-4709 > > > Repository: pig-git > > > Description > ------- > > Currently, the GROUPBY operator of PIG is mapped by Spark's CoGroup. When the > grouped data is consumed by subsequent operations to perform algebraic > operations, this is sub-optimal as there is lot of shuffle traffic. > The Spark Plan must be optimized to use reduceBy, where possible, so that a > combiner is used. > > Introduced a combiner optimizer that does the following: > // Checks for algebraic operations and if they exist. > // Replaces global rearrange (cogroup) with reduceBy as follows: > // Input: > // foreach (using algebraicOp) > // -> packager > // -> globalRearrange > // -> localRearrange > // Output: > // foreach (using algebraicOp.Final) > // -> reduceBy (uses algebraicOp.Intermediate) > // -> foreach (using algebraicOp.Initial) > // -> localRearrange > > > Diffs > ----- > > > src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POForEach.java > f8c1658 > > src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/PORollupHIIForEach.java > aca347d > src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java > a4dbadd > > src/org/apache/pig/backend/hadoop/executionengine/spark/converter/GlobalRearrangeConverter.java > 5f74992 > > src/org/apache/pig/backend/hadoop/executionengine/spark/converter/LocalRearrangeConverter.java > 9ce0492 > > src/org/apache/pig/backend/hadoop/executionengine/spark/converter/PigSecondaryKeyComparatorSpark.java > PRE-CREATION > > src/org/apache/pig/backend/hadoop/executionengine/spark/converter/PreCombinerLocalRearrangeConverter.java > PRE-CREATION > > src/org/apache/pig/backend/hadoop/executionengine/spark/converter/ReduceByConverter.java > PRE-CREATION > > src/org/apache/pig/backend/hadoop/executionengine/spark/operator/POReduceBySpark.java > PRE-CREATION > > src/org/apache/pig/backend/hadoop/executionengine/spark/optimizer/SparkCombinerOptimizer.java > PRE-CREATION > > src/org/apache/pig/backend/hadoop/executionengine/util/CombinerOptimizerUtil.java > 6b66ca1 > > src/org/apache/pig/backend/hadoop/executionengine/util/SecondaryKeyOptimizerUtil.java > 546d91e > test/org/apache/pig/test/TestCombiner.java df44293 > > Diff: https://reviews.apache.org/r/40743/diff/ > > > Testing > ------- > > The patch unblocked one UT in TestCombiner. Added another UT in the same > class. Also did some manual testing. > > > Thanks, > > Pallavi Rao > >
