Author: xuefu
Date: Thu Jun 11 13:18:26 2015
New Revision: 1684882

URL: http://svn.apache.org/r1684882
Log:
PIG-4596: Fix unit test failures about MergeJoinConverter in spark mode (Liyun 
via Xuefu)

Modified:
    
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/MergeJoinConverter.java

Modified: 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/MergeJoinConverter.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/MergeJoinConverter.java?rev=1684882&r1=1684881&r2=1684882&view=diff
==============================================================================
--- 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/MergeJoinConverter.java
 (original)
+++ 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/MergeJoinConverter.java
 Thu Jun 11 13:18:26 2015
@@ -52,25 +52,25 @@ public class MergeJoinConverter implemen
         RDD<Tuple> rdd1 = predecessors.get(0);
         RDD<Tuple> rdd2 = predecessors.get(1);
 
-        // make (key, value) pairs, key has type Object, value has type Tuple
-        RDD<Tuple2<Object, Tuple>> rdd1Pair = rdd1.map(new ExtractKeyFunction(
-                poMergeJoin, 0), SparkUtil.<Object, Tuple>getTuple2Manifest());
-        RDD<Tuple2<Object, Tuple>> rdd2Pair = rdd2.map(new ExtractKeyFunction(
-                poMergeJoin, 1), SparkUtil.<Object, Tuple>getTuple2Manifest());
+        // make (key, value) pairs, key has type IndexedKey, value has type 
Tuple
+        RDD<Tuple2<IndexedKey, Tuple>> rdd1Pair = rdd1.map(new 
ExtractKeyFunction(
+                poMergeJoin, 0), SparkUtil.<IndexedKey, 
Tuple>getTuple2Manifest());
+        RDD<Tuple2<IndexedKey, Tuple>> rdd2Pair = rdd2.map(new 
ExtractKeyFunction(
+                poMergeJoin, 1), SparkUtil.<IndexedKey, 
Tuple>getTuple2Manifest());
 
-        JavaPairRDD<Object, Tuple> prdd1 = new JavaPairRDD<Object, Tuple>(
-                rdd1Pair, SparkUtil.getManifest(Object.class),
+        JavaPairRDD<IndexedKey, Tuple> prdd1 = new JavaPairRDD<IndexedKey, 
Tuple>(
+                rdd1Pair, SparkUtil.getManifest(IndexedKey.class),
                 SparkUtil.getManifest(Tuple.class));
-        JavaPairRDD<Object, Tuple> prdd2 = new JavaPairRDD<Object, Tuple>(
-                rdd2Pair, SparkUtil.getManifest(Object.class),
+        JavaPairRDD<IndexedKey, Tuple> prdd2 = new JavaPairRDD<IndexedKey, 
Tuple>(
+                rdd2Pair, SparkUtil.getManifest(IndexedKey.class),
                 SparkUtil.getManifest(Tuple.class));
 
-        JavaPairRDD<Object, Tuple2<Tuple, Tuple>> jrdd = prdd1
+        JavaPairRDD<IndexedKey, Tuple2<Tuple, Tuple>> jrdd = prdd1
                 .join(prdd2);
 
         // map to get JavaRDD<Tuple> from join() output, which is
-        // JavaPairRDD<Object, Tuple2<Tuple, Tuple>> by
-        // ignoring the key (of type Object) and appending the values (the
+        // JavaPairRDD<IndexedKey, Tuple2<Tuple, Tuple>> by
+        // ignoring the key (of type IndexedKey) and appending the values (the
         // Tuples)
         JavaRDD<Tuple> result = jrdd
                 .mapPartitions(new ToValueFunction());
@@ -79,7 +79,7 @@ public class MergeJoinConverter implemen
     }
 
     private static class ExtractKeyFunction extends
-            AbstractFunction1<Tuple, Tuple2<Object, Tuple>> implements
+            AbstractFunction1<Tuple, Tuple2<IndexedKey, Tuple>> implements
             Serializable {
 
         private final POMergeJoin poMergeJoin;
@@ -91,7 +91,7 @@ public class MergeJoinConverter implemen
         }
 
         @Override
-        public Tuple2<Object, Tuple> apply(Tuple tuple) {
+        public Tuple2<IndexedKey, Tuple> apply(Tuple tuple) {
             poMergeJoin.getLRs()[LR_index].attachInput(tuple);
 
             try {
@@ -105,9 +105,10 @@ public class MergeJoinConverter implemen
 
                 // If tuple is (AA, 5) and key index is $1, then it lrOut is 0 
5 (AA),
                 // so get(1) returns key
+                Object index =  ((Tuple) lrOut.result).get(0);
                 Object key = ((Tuple) lrOut.result).get(1);
                 Tuple value = tuple;
-                Tuple2<Object, Tuple> tuple_KeyValue = new Tuple2<Object, 
Tuple>(key,
+                Tuple2<IndexedKey, Tuple> tuple_KeyValue = new 
Tuple2<IndexedKey, Tuple>(new IndexedKey((Byte)index,key),
                         value);
 
                 return tuple_KeyValue;
@@ -119,23 +120,23 @@ public class MergeJoinConverter implemen
     }
 
     private static class ToValueFunction
-            implements FlatMapFunction<Iterator<Tuple2<Object, Tuple2<Tuple, 
Tuple>>>, Tuple>, Serializable {
+            implements FlatMapFunction<Iterator<Tuple2<IndexedKey, 
Tuple2<Tuple, Tuple>>>, Tuple>, Serializable {
 
         private class Tuple2TransformIterable implements Iterable<Tuple> {
 
-            Iterator<Tuple2<Object, Tuple2<Tuple, Tuple>>> in;
+            Iterator<Tuple2<IndexedKey, Tuple2<Tuple, Tuple>>> in;
 
             Tuple2TransformIterable(
-                    Iterator<Tuple2<Object, Tuple2<Tuple, Tuple>>> input) {
+                    Iterator<Tuple2<IndexedKey, Tuple2<Tuple, Tuple>>> input) {
                 in = input;
             }
 
             public Iterator<Tuple> iterator() {
-                return new IteratorTransform<Tuple2<Object, Tuple2<Tuple, 
Tuple>>, Tuple>(
+                return new IteratorTransform<Tuple2<IndexedKey, Tuple2<Tuple, 
Tuple>>, Tuple>(
                         in) {
                     @Override
                     protected Tuple transform(
-                            Tuple2<Object, Tuple2<Tuple, Tuple>> next) {
+                            Tuple2<IndexedKey, Tuple2<Tuple, Tuple>> next) {
                         try {
 
                             Tuple leftTuple = next._2()._1();
@@ -165,7 +166,7 @@ public class MergeJoinConverter implemen
 
         @Override
         public Iterable<Tuple> call(
-                Iterator<Tuple2<Object, Tuple2<Tuple, Tuple>>> input) {
+                Iterator<Tuple2<IndexedKey, Tuple2<Tuple, Tuple>>> input) {
             return new Tuple2TransformIterable(input);
         }
     }


Reply via email to