Author: xuefu
Date: Wed Jun  3 04:22:26 2015
New Revision: 1683220

URL: http://svn.apache.org/r1683220
Log:
PIG-4577: Use cogroup spark api to implement 'groupby+secondarysort' case in 
GlobalRearrangeConverter.java (Liyun via Xuefu)

Modified:
    
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/GlobalRearrangeConverter.java

Modified: 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/GlobalRearrangeConverter.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/GlobalRearrangeConverter.java?rev=1683220&r1=1683219&r2=1683220&view=diff
==============================================================================
--- 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/GlobalRearrangeConverter.java
 (original)
+++ 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/GlobalRearrangeConverter.java
 Wed Jun  3 04:22:26 2015
@@ -29,7 +29,6 @@ import org.apache.commons.logging.LogFac
 import org.apache.pig.backend.executionengine.ExecException;
 import org.apache.pig.backend.hadoop.executionengine.spark.SparkUtil;
 import 
org.apache.pig.backend.hadoop.executionengine.spark.operator.POGlobalRearrangeSpark;
-import org.apache.pig.builtin.LOG;
 import org.apache.pig.data.DataType;
 import org.apache.pig.data.Tuple;
 import org.apache.pig.data.TupleFactory;
@@ -85,16 +84,16 @@ public class GlobalRearrangeConverter im
 //        if (predecessors.size() == 1 && op.isUseSecondaryKey()) {
 //            return handleSecondarySort(predecessors.get(0), op, parallelism);
 //        }
+        List<RDD<Tuple2<IndexedKey, Tuple>>> rddPairs = new 
ArrayList<RDD<Tuple2<IndexedKey, Tuple>>>();
 
         if (predecessors.size() == 1 && op.isUseSecondaryKey()) {
-            return handleSecondarySort(predecessors.get(0), op, parallelism);
-        }
-
-        List<RDD<Tuple2<IndexedKey, Tuple>>> rddPairs = new 
ArrayList<RDD<Tuple2<IndexedKey, Tuple>>>();
-        for (RDD<Tuple> rdd : predecessors) {
-            JavaRDD<Tuple> jrdd = JavaRDD.fromRDD(rdd, 
SparkUtil.getManifest(Tuple.class));
-            JavaRDD<Tuple2<IndexedKey, Tuple>> rddPair = jrdd.map(new 
ToKeyValueFunction());
-            rddPairs.add(rddPair.rdd());
+            rddPairs.add(handleSecondarySort(predecessors.get(0), op, 
parallelism).rdd());
+        } else {
+            for (RDD<Tuple> rdd : predecessors) {
+                JavaRDD<Tuple> jrdd = JavaRDD.fromRDD(rdd, 
SparkUtil.getManifest(Tuple.class));
+                JavaRDD<Tuple2<IndexedKey, Tuple>> rddPair = jrdd.map(new 
ToKeyValueFunction());
+                rddPairs.add(rddPair.rdd());
+            }
         }
 
         // Something's wrong with the type parameters of CoGroupedRDD
@@ -110,7 +109,7 @@ public class GlobalRearrangeConverter im
         return rdd.toJavaRDD().map(new ToGroupKeyValueFunction()).rdd();
     }
 
-    private RDD<Tuple> handleSecondarySort(
+    private JavaRDD<Tuple2<IndexedKey,Tuple>> handleSecondarySort(
             RDD<Tuple> rdd, POGlobalRearrangeSpark op, int parallelism) {
 
         RDD<Tuple2<Tuple, Object>> rddPair = rdd.map(new 
ToKeyNullValueFunction(),
@@ -124,11 +123,9 @@ public class GlobalRearrangeConverter im
         JavaPairRDD<Tuple, Object> sorted = 
pairRDD.repartitionAndSortWithinPartitions(
                 new HashPartitioner(parallelism),
                 new 
PigSecondaryKeyComparatorSpark(op.getSecondarySortOrder()));
-        JavaRDD<Tuple> mapped = sorted.mapPartitions(new 
RemoveValueFunction());
-        JavaPairRDD<Object, Iterable<Tuple>> prdd = mapped.groupBy(
-                new GetKeyFunction(op), parallelism);
-        JavaRDD<Tuple> jrdd2 = prdd.map(new GroupTupleFunction(op));
-        return jrdd2.rdd();
+        JavaRDD<Tuple> jrdd = sorted.keys();
+        JavaRDD<Tuple2<IndexedKey,Tuple>> jrddPair = jrdd.map(new 
ToKeyValueFunction(op));
+        return jrddPair;
     }
 
     private static class RemoveValueFunction implements
@@ -328,7 +325,7 @@ public class GlobalRearrangeConverter im
     /**
      * IndexedKey records the index and key info.
      * This is used as key for JOINs. It addresses the case where key is
-     * either empty (or is a tuple with one or more emoty fields). In this 
case,
+     * either empty (or is a tuple with one or more empty fields). In this 
case,
      * we must respect the SQL standard as documented in the equals() method.
      */
     public static class IndexedKey implements Serializable {
@@ -449,6 +446,16 @@ public class GlobalRearrangeConverter im
     private static class ToKeyValueFunction implements
             Function<Tuple, Tuple2<IndexedKey, Tuple>>, Serializable {
 
+        private POGlobalRearrangeSpark glrSpark = null;
+
+        public ToKeyValueFunction(POGlobalRearrangeSpark glrSpark) {
+            this.glrSpark = glrSpark;
+        }
+
+        public ToKeyValueFunction() {
+
+        }
+
         @Override
         public Tuple2<IndexedKey, Tuple> call(Tuple t) {
             try {
@@ -456,8 +463,15 @@ public class GlobalRearrangeConverter im
                     LOG.debug("ToKeyValueFunction in " + t);
                 }
 
+                Object key = null;
+                if ((glrSpark != null) && (glrSpark.isUseSecondaryKey())) {
+                    key = ((Tuple) t.get(1)).get(0);
+                } else {
+                    key = t.get(1);
+                }
+
                 Tuple2<IndexedKey, Tuple> out = new Tuple2<IndexedKey, Tuple>(
-                        new IndexedKey((Byte) t.get(0), t.get(1)),
+                        new IndexedKey((Byte) t.get(0), key),
                         (Tuple) t.get(2));
                 if (LOG.isDebugEnabled()) {
                     LOG.debug("ToKeyValueFunction out " + out);


Reply via email to