Author: xuefu
Date: Tue Jun 30 03:12:43 2015
New Revision: 1688346

URL: http://svn.apache.org/r1688346
Log:
PIG-4615: Fix null keys join in SkewedJoin in spark mode (Liyun via Xuefu)

Modified:
    
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/SkewedJoinConverter.java

Modified: 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/SkewedJoinConverter.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/SkewedJoinConverter.java?rev=1688346&r1=1688345&r2=1688346&view=diff
==============================================================================
--- 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/SkewedJoinConverter.java
 (original)
+++ 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/SkewedJoinConverter.java
 Tue Jun 30 03:12:43 2015
@@ -64,26 +64,26 @@ public class SkewedJoinConverter impleme
         RDD<Tuple> rdd1 = predecessors.get(0);
         RDD<Tuple> rdd2 = predecessors.get(1);
 
-        // make (key, value) pairs, key has type Object, value has type Tuple
-        RDD<Tuple2<Object, Tuple>> rdd1Pair = rdd1.map(new ExtractKeyFunction(
-                this, 0), SparkUtil.<Object, Tuple>getTuple2Manifest());
-        RDD<Tuple2<Object, Tuple>> rdd2Pair = rdd2.map(new ExtractKeyFunction(
-                this, 1), SparkUtil.<Object, Tuple>getTuple2Manifest());
+        // make (key, value) pairs, key has type IndexedKey, value has type 
Tuple
+        RDD<Tuple2<IndexedKey, Tuple>> rdd1Pair = rdd1.map(new 
ExtractKeyFunction(
+                this, 0), SparkUtil.<IndexedKey, Tuple>getTuple2Manifest());
+        RDD<Tuple2<IndexedKey, Tuple>> rdd2Pair = rdd2.map(new 
ExtractKeyFunction(
+                this, 1), SparkUtil.<IndexedKey, Tuple>getTuple2Manifest());
 
         // join fn is present in JavaPairRDD class ..
-        JavaPairRDD<Object, Tuple> rdd1Pair_javaRDD = new JavaPairRDD<Object, 
Tuple>(
-                rdd1Pair, SparkUtil.getManifest(Object.class),
+        JavaPairRDD<IndexedKey, Tuple> rdd1Pair_javaRDD = new 
JavaPairRDD<IndexedKey, Tuple>(
+                rdd1Pair, SparkUtil.getManifest(IndexedKey.class),
                 SparkUtil.getManifest(Tuple.class));
-        JavaPairRDD<Object, Tuple> rdd2Pair_javaRDD = new JavaPairRDD<Object, 
Tuple>(
-                rdd2Pair, SparkUtil.getManifest(Object.class),
+        JavaPairRDD<IndexedKey, Tuple> rdd2Pair_javaRDD = new 
JavaPairRDD<IndexedKey, Tuple>(
+                rdd2Pair, SparkUtil.getManifest(IndexedKey.class),
                 SparkUtil.getManifest(Tuple.class));
 
         // join() returns (key, (t1, t2)) where (key, t1) is in this and (key, 
t2) is in other
-        JavaPairRDD<Object, Tuple2<Tuple, Tuple>> result_KeyValue = 
rdd1Pair_javaRDD
+        JavaPairRDD<IndexedKey, Tuple2<Tuple, Tuple>> result_KeyValue = 
rdd1Pair_javaRDD
                 .join(rdd2Pair_javaRDD);
 
-        // map to get JavaRDD<Tuple> from JAvaPairRDD<Object, Tuple2<Tuple, 
Tuple>> by
-        // ignoring the key (of type Object) and appending the values (the
+        // map to get JavaRDD<Tuple> from JAvaPairRDD<IndexedKey, 
Tuple2<Tuple, Tuple>> by
+        // ignoring the key (of type IndexedKey) and appending the values (the
         // Tuples)
         JavaRDD<Tuple> result = result_KeyValue
                 .mapPartitions(new ToValueFunction());
@@ -115,7 +115,7 @@ public class SkewedJoinConverter impleme
     }
 
     private static class ExtractKeyFunction extends
-            AbstractFunction1<Tuple, Tuple2<Object, Tuple>> implements
+            AbstractFunction1<Tuple, Tuple2<IndexedKey, Tuple>> implements
             Serializable {
 
         private final SkewedJoinConverter poSkewedJoin;
@@ -127,7 +127,7 @@ public class SkewedJoinConverter impleme
         }
 
         @Override
-        public Tuple2<Object, Tuple> apply(Tuple tuple) {
+        public Tuple2<IndexedKey, Tuple> apply(Tuple tuple) {
 
             // attach tuple to LocalRearrange
             poSkewedJoin.LRs[LR_index].attachInput(tuple);
@@ -138,13 +138,14 @@ public class SkewedJoinConverter impleme
 
                 // If tuple is (AA, 5) and key index is $1, then it lrOut is 0 
5
                 // (AA), so get(1) returns key
+                Byte index = (Byte)((Tuple) lrOut.result).get(0);
                 Object key = ((Tuple) lrOut.result).get(1);
-
+                IndexedKey indexedKey = new IndexedKey(index,key);
                 Tuple value = tuple;
 
                 // make a (key, value) pair
-                Tuple2<Object, Tuple> tuple_KeyValue = new Tuple2<Object, 
Tuple>(
-                        key, value);
+                Tuple2<IndexedKey, Tuple> tuple_KeyValue = new 
Tuple2<IndexedKey, Tuple>(
+                        indexedKey, value);
 
                 return tuple_KeyValue;
 
@@ -158,23 +159,23 @@ public class SkewedJoinConverter impleme
 
     private static class ToValueFunction
             implements
-            FlatMapFunction<Iterator<Tuple2<Object, Tuple2<Tuple, Tuple>>>, 
Tuple>, Serializable {
+            FlatMapFunction<Iterator<Tuple2<IndexedKey, Tuple2<Tuple, 
Tuple>>>, Tuple>, Serializable {
 
         private class Tuple2TransformIterable implements Iterable<Tuple> {
 
-            Iterator<Tuple2<Object, Tuple2<Tuple, Tuple>>> in;
+            Iterator<Tuple2<IndexedKey, Tuple2<Tuple, Tuple>>> in;
 
             Tuple2TransformIterable(
-                    Iterator<Tuple2<Object, Tuple2<Tuple, Tuple>>> input) {
+                    Iterator<Tuple2<IndexedKey, Tuple2<Tuple, Tuple>>> input) {
                 in = input;
             }
 
             public Iterator<Tuple> iterator() {
-                return new IteratorTransform<Tuple2<Object, Tuple2<Tuple, 
Tuple>>, Tuple>(
+                return new IteratorTransform<Tuple2<IndexedKey, Tuple2<Tuple, 
Tuple>>, Tuple>(
                         in) {
                     @Override
                     protected Tuple transform(
-                            Tuple2<Object, Tuple2<Tuple, Tuple>> next) {
+                            Tuple2<IndexedKey, Tuple2<Tuple, Tuple>> next) {
                         try {
 
                             Tuple leftTuple = next._2._1;
@@ -208,7 +209,7 @@ public class SkewedJoinConverter impleme
 
         @Override
         public Iterable<Tuple> call(
-                Iterator<Tuple2<Object, Tuple2<Tuple, Tuple>>> input) {
+                Iterator<Tuple2<IndexedKey, Tuple2<Tuple, Tuple>>> input) {
             return new Tuple2TransformIterable(input);
         }
     }


Reply via email to