Author: xuefu
Date: Thu Jun 11 13:18:26 2015
New Revision: 1684882
URL: http://svn.apache.org/r1684882
Log:
PIG-4596: Fix unit test failures about MergeJoinConverter in spark mode (Liyun
via Xuefu)
Modified:
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/MergeJoinConverter.java
Modified:
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/MergeJoinConverter.java
URL:
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/MergeJoinConverter.java?rev=1684882&r1=1684881&r2=1684882&view=diff
==============================================================================
---
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/MergeJoinConverter.java
(original)
+++
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/MergeJoinConverter.java
Thu Jun 11 13:18:26 2015
@@ -52,25 +52,25 @@ public class MergeJoinConverter implemen
RDD<Tuple> rdd1 = predecessors.get(0);
RDD<Tuple> rdd2 = predecessors.get(1);
- // make (key, value) pairs, key has type Object, value has type Tuple
- RDD<Tuple2<Object, Tuple>> rdd1Pair = rdd1.map(new ExtractKeyFunction(
- poMergeJoin, 0), SparkUtil.<Object, Tuple>getTuple2Manifest());
- RDD<Tuple2<Object, Tuple>> rdd2Pair = rdd2.map(new ExtractKeyFunction(
- poMergeJoin, 1), SparkUtil.<Object, Tuple>getTuple2Manifest());
+ // make (key, value) pairs, key has type IndexedKey, value has type
Tuple
+ RDD<Tuple2<IndexedKey, Tuple>> rdd1Pair = rdd1.map(new
ExtractKeyFunction(
+ poMergeJoin, 0), SparkUtil.<IndexedKey,
Tuple>getTuple2Manifest());
+ RDD<Tuple2<IndexedKey, Tuple>> rdd2Pair = rdd2.map(new
ExtractKeyFunction(
+ poMergeJoin, 1), SparkUtil.<IndexedKey,
Tuple>getTuple2Manifest());
- JavaPairRDD<Object, Tuple> prdd1 = new JavaPairRDD<Object, Tuple>(
- rdd1Pair, SparkUtil.getManifest(Object.class),
+ JavaPairRDD<IndexedKey, Tuple> prdd1 = new JavaPairRDD<IndexedKey,
Tuple>(
+ rdd1Pair, SparkUtil.getManifest(IndexedKey.class),
SparkUtil.getManifest(Tuple.class));
- JavaPairRDD<Object, Tuple> prdd2 = new JavaPairRDD<Object, Tuple>(
- rdd2Pair, SparkUtil.getManifest(Object.class),
+ JavaPairRDD<IndexedKey, Tuple> prdd2 = new JavaPairRDD<IndexedKey,
Tuple>(
+ rdd2Pair, SparkUtil.getManifest(IndexedKey.class),
SparkUtil.getManifest(Tuple.class));
- JavaPairRDD<Object, Tuple2<Tuple, Tuple>> jrdd = prdd1
+ JavaPairRDD<IndexedKey, Tuple2<Tuple, Tuple>> jrdd = prdd1
.join(prdd2);
// map to get JavaRDD<Tuple> from join() output, which is
- // JavaPairRDD<Object, Tuple2<Tuple, Tuple>> by
- // ignoring the key (of type Object) and appending the values (the
+ // JavaPairRDD<IndexedKey, Tuple2<Tuple, Tuple>> by
+ // ignoring the key (of type IndexedKey) and appending the values (the
// Tuples)
JavaRDD<Tuple> result = jrdd
.mapPartitions(new ToValueFunction());
@@ -79,7 +79,7 @@ public class MergeJoinConverter implemen
}
private static class ExtractKeyFunction extends
- AbstractFunction1<Tuple, Tuple2<Object, Tuple>> implements
+ AbstractFunction1<Tuple, Tuple2<IndexedKey, Tuple>> implements
Serializable {
private final POMergeJoin poMergeJoin;
@@ -91,7 +91,7 @@ public class MergeJoinConverter implemen
}
@Override
- public Tuple2<Object, Tuple> apply(Tuple tuple) {
+ public Tuple2<IndexedKey, Tuple> apply(Tuple tuple) {
poMergeJoin.getLRs()[LR_index].attachInput(tuple);
try {
@@ -105,9 +105,10 @@ public class MergeJoinConverter implemen
// If tuple is (AA, 5) and key index is $1, then it lrOut is 0
5 (AA),
// so get(1) returns key
+ Object index = ((Tuple) lrOut.result).get(0);
Object key = ((Tuple) lrOut.result).get(1);
Tuple value = tuple;
- Tuple2<Object, Tuple> tuple_KeyValue = new Tuple2<Object,
Tuple>(key,
+ Tuple2<IndexedKey, Tuple> tuple_KeyValue = new
Tuple2<IndexedKey, Tuple>(new IndexedKey((Byte)index,key),
value);
return tuple_KeyValue;
@@ -119,23 +120,23 @@ public class MergeJoinConverter implemen
}
private static class ToValueFunction
- implements FlatMapFunction<Iterator<Tuple2<Object, Tuple2<Tuple,
Tuple>>>, Tuple>, Serializable {
+ implements FlatMapFunction<Iterator<Tuple2<IndexedKey,
Tuple2<Tuple, Tuple>>>, Tuple>, Serializable {
private class Tuple2TransformIterable implements Iterable<Tuple> {
- Iterator<Tuple2<Object, Tuple2<Tuple, Tuple>>> in;
+ Iterator<Tuple2<IndexedKey, Tuple2<Tuple, Tuple>>> in;
Tuple2TransformIterable(
- Iterator<Tuple2<Object, Tuple2<Tuple, Tuple>>> input) {
+ Iterator<Tuple2<IndexedKey, Tuple2<Tuple, Tuple>>> input) {
in = input;
}
public Iterator<Tuple> iterator() {
- return new IteratorTransform<Tuple2<Object, Tuple2<Tuple,
Tuple>>, Tuple>(
+ return new IteratorTransform<Tuple2<IndexedKey, Tuple2<Tuple,
Tuple>>, Tuple>(
in) {
@Override
protected Tuple transform(
- Tuple2<Object, Tuple2<Tuple, Tuple>> next) {
+ Tuple2<IndexedKey, Tuple2<Tuple, Tuple>> next) {
try {
Tuple leftTuple = next._2()._1();
@@ -165,7 +166,7 @@ public class MergeJoinConverter implemen
@Override
public Iterable<Tuple> call(
- Iterator<Tuple2<Object, Tuple2<Tuple, Tuple>>> input) {
+ Iterator<Tuple2<IndexedKey, Tuple2<Tuple, Tuple>>> input) {
return new Tuple2TransformIterable(input);
}
}