Author: xuefu
Date: Wed Jun 3 04:22:26 2015
New Revision: 1683220
URL: http://svn.apache.org/r1683220
Log:
PIG-4577: Use cogroup spark api to implement 'groupby+secondarysort' case in
GlobalRearrangeConverter.java (Liyun via Xuefu)
Modified:
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/GlobalRearrangeConverter.java
Modified:
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/GlobalRearrangeConverter.java
URL:
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/GlobalRearrangeConverter.java?rev=1683220&r1=1683219&r2=1683220&view=diff
==============================================================================
---
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/GlobalRearrangeConverter.java
(original)
+++
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/GlobalRearrangeConverter.java
Wed Jun 3 04:22:26 2015
@@ -29,7 +29,6 @@ import org.apache.commons.logging.LogFac
import org.apache.pig.backend.executionengine.ExecException;
import org.apache.pig.backend.hadoop.executionengine.spark.SparkUtil;
import
org.apache.pig.backend.hadoop.executionengine.spark.operator.POGlobalRearrangeSpark;
-import org.apache.pig.builtin.LOG;
import org.apache.pig.data.DataType;
import org.apache.pig.data.Tuple;
import org.apache.pig.data.TupleFactory;
@@ -85,16 +84,16 @@ public class GlobalRearrangeConverter im
// if (predecessors.size() == 1 && op.isUseSecondaryKey()) {
// return handleSecondarySort(predecessors.get(0), op, parallelism);
// }
+ List<RDD<Tuple2<IndexedKey, Tuple>>> rddPairs = new
ArrayList<RDD<Tuple2<IndexedKey, Tuple>>>();
if (predecessors.size() == 1 && op.isUseSecondaryKey()) {
- return handleSecondarySort(predecessors.get(0), op, parallelism);
- }
-
- List<RDD<Tuple2<IndexedKey, Tuple>>> rddPairs = new
ArrayList<RDD<Tuple2<IndexedKey, Tuple>>>();
- for (RDD<Tuple> rdd : predecessors) {
- JavaRDD<Tuple> jrdd = JavaRDD.fromRDD(rdd,
SparkUtil.getManifest(Tuple.class));
- JavaRDD<Tuple2<IndexedKey, Tuple>> rddPair = jrdd.map(new
ToKeyValueFunction());
- rddPairs.add(rddPair.rdd());
+ rddPairs.add(handleSecondarySort(predecessors.get(0), op,
parallelism).rdd());
+ } else {
+ for (RDD<Tuple> rdd : predecessors) {
+ JavaRDD<Tuple> jrdd = JavaRDD.fromRDD(rdd,
SparkUtil.getManifest(Tuple.class));
+ JavaRDD<Tuple2<IndexedKey, Tuple>> rddPair = jrdd.map(new
ToKeyValueFunction());
+ rddPairs.add(rddPair.rdd());
+ }
}
// Something's wrong with the type parameters of CoGroupedRDD
@@ -110,7 +109,7 @@ public class GlobalRearrangeConverter im
return rdd.toJavaRDD().map(new ToGroupKeyValueFunction()).rdd();
}
- private RDD<Tuple> handleSecondarySort(
+ private JavaRDD<Tuple2<IndexedKey,Tuple>> handleSecondarySort(
RDD<Tuple> rdd, POGlobalRearrangeSpark op, int parallelism) {
RDD<Tuple2<Tuple, Object>> rddPair = rdd.map(new
ToKeyNullValueFunction(),
@@ -124,11 +123,9 @@ public class GlobalRearrangeConverter im
JavaPairRDD<Tuple, Object> sorted =
pairRDD.repartitionAndSortWithinPartitions(
new HashPartitioner(parallelism),
new
PigSecondaryKeyComparatorSpark(op.getSecondarySortOrder()));
- JavaRDD<Tuple> mapped = sorted.mapPartitions(new
RemoveValueFunction());
- JavaPairRDD<Object, Iterable<Tuple>> prdd = mapped.groupBy(
- new GetKeyFunction(op), parallelism);
- JavaRDD<Tuple> jrdd2 = prdd.map(new GroupTupleFunction(op));
- return jrdd2.rdd();
+ JavaRDD<Tuple> jrdd = sorted.keys();
+ JavaRDD<Tuple2<IndexedKey,Tuple>> jrddPair = jrdd.map(new
ToKeyValueFunction(op));
+ return jrddPair;
}
private static class RemoveValueFunction implements
@@ -328,7 +325,7 @@ public class GlobalRearrangeConverter im
/**
* IndexedKey records the index and key info.
* This is used as key for JOINs. It addresses the case where key is
- * either empty (or is a tuple with one or more emoty fields). In this
case,
+ * either empty (or is a tuple with one or more empty fields). In this
case,
* we must respect the SQL standard as documented in the equals() method.
*/
public static class IndexedKey implements Serializable {
@@ -449,6 +446,16 @@ public class GlobalRearrangeConverter im
private static class ToKeyValueFunction implements
Function<Tuple, Tuple2<IndexedKey, Tuple>>, Serializable {
+ private POGlobalRearrangeSpark glrSpark = null;
+
+ public ToKeyValueFunction(POGlobalRearrangeSpark glrSpark) {
+ this.glrSpark = glrSpark;
+ }
+
+ public ToKeyValueFunction() {
+
+ }
+
@Override
public Tuple2<IndexedKey, Tuple> call(Tuple t) {
try {
@@ -456,8 +463,15 @@ public class GlobalRearrangeConverter im
LOG.debug("ToKeyValueFunction in " + t);
}
+ Object key = null;
+ if ((glrSpark != null) && (glrSpark.isUseSecondaryKey())) {
+ key = ((Tuple) t.get(1)).get(0);
+ } else {
+ key = t.get(1);
+ }
+
Tuple2<IndexedKey, Tuple> out = new Tuple2<IndexedKey, Tuple>(
- new IndexedKey((Byte) t.get(0), t.get(1)),
+ new IndexedKey((Byte) t.get(0), key),
(Tuple) t.get(2));
if (LOG.isDebugEnabled()) {
LOG.debug("ToKeyValueFunction out " + out);