Author: xuefu
Date: Tue Jun 30 03:12:43 2015
New Revision: 1688346
URL: http://svn.apache.org/r1688346
Log:
PIG-4615: Fix null keys join in SkewedJoin in spark mode (Liyun via Xuefu)
Modified:
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/SkewedJoinConverter.java
Modified:
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/SkewedJoinConverter.java
URL:
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/SkewedJoinConverter.java?rev=1688346&r1=1688345&r2=1688346&view=diff
==============================================================================
---
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/SkewedJoinConverter.java
(original)
+++
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/SkewedJoinConverter.java
Tue Jun 30 03:12:43 2015
@@ -64,26 +64,26 @@ public class SkewedJoinConverter impleme
RDD<Tuple> rdd1 = predecessors.get(0);
RDD<Tuple> rdd2 = predecessors.get(1);
- // make (key, value) pairs, key has type Object, value has type Tuple
- RDD<Tuple2<Object, Tuple>> rdd1Pair = rdd1.map(new ExtractKeyFunction(
- this, 0), SparkUtil.<Object, Tuple>getTuple2Manifest());
- RDD<Tuple2<Object, Tuple>> rdd2Pair = rdd2.map(new ExtractKeyFunction(
- this, 1), SparkUtil.<Object, Tuple>getTuple2Manifest());
+ // make (key, value) pairs, key has type IndexedKey, value has type
Tuple
+ RDD<Tuple2<IndexedKey, Tuple>> rdd1Pair = rdd1.map(new
ExtractKeyFunction(
+ this, 0), SparkUtil.<IndexedKey, Tuple>getTuple2Manifest());
+ RDD<Tuple2<IndexedKey, Tuple>> rdd2Pair = rdd2.map(new
ExtractKeyFunction(
+ this, 1), SparkUtil.<IndexedKey, Tuple>getTuple2Manifest());
// join fn is present in JavaPairRDD class ..
- JavaPairRDD<Object, Tuple> rdd1Pair_javaRDD = new JavaPairRDD<Object,
Tuple>(
- rdd1Pair, SparkUtil.getManifest(Object.class),
+ JavaPairRDD<IndexedKey, Tuple> rdd1Pair_javaRDD = new
JavaPairRDD<IndexedKey, Tuple>(
+ rdd1Pair, SparkUtil.getManifest(IndexedKey.class),
SparkUtil.getManifest(Tuple.class));
- JavaPairRDD<Object, Tuple> rdd2Pair_javaRDD = new JavaPairRDD<Object,
Tuple>(
- rdd2Pair, SparkUtil.getManifest(Object.class),
+ JavaPairRDD<IndexedKey, Tuple> rdd2Pair_javaRDD = new
JavaPairRDD<IndexedKey, Tuple>(
+ rdd2Pair, SparkUtil.getManifest(IndexedKey.class),
SparkUtil.getManifest(Tuple.class));
// join() returns (key, (t1, t2)) where (key, t1) is in this and (key,
t2) is in other
- JavaPairRDD<Object, Tuple2<Tuple, Tuple>> result_KeyValue =
rdd1Pair_javaRDD
+ JavaPairRDD<IndexedKey, Tuple2<Tuple, Tuple>> result_KeyValue =
rdd1Pair_javaRDD
.join(rdd2Pair_javaRDD);
- // map to get JavaRDD<Tuple> from JAvaPairRDD<Object, Tuple2<Tuple,
Tuple>> by
- // ignoring the key (of type Object) and appending the values (the
+ // map to get JavaRDD<Tuple> from JAvaPairRDD<IndexedKey,
Tuple2<Tuple, Tuple>> by
+ // ignoring the key (of type IndexedKey) and appending the values (the
// Tuples)
JavaRDD<Tuple> result = result_KeyValue
.mapPartitions(new ToValueFunction());
@@ -115,7 +115,7 @@ public class SkewedJoinConverter impleme
}
private static class ExtractKeyFunction extends
- AbstractFunction1<Tuple, Tuple2<Object, Tuple>> implements
+ AbstractFunction1<Tuple, Tuple2<IndexedKey, Tuple>> implements
Serializable {
private final SkewedJoinConverter poSkewedJoin;
@@ -127,7 +127,7 @@ public class SkewedJoinConverter impleme
}
@Override
- public Tuple2<Object, Tuple> apply(Tuple tuple) {
+ public Tuple2<IndexedKey, Tuple> apply(Tuple tuple) {
// attach tuple to LocalRearrange
poSkewedJoin.LRs[LR_index].attachInput(tuple);
@@ -138,13 +138,14 @@ public class SkewedJoinConverter impleme
// If tuple is (AA, 5) and key index is $1, then it lrOut is 0
5
// (AA), so get(1) returns key
+ Byte index = (Byte)((Tuple) lrOut.result).get(0);
Object key = ((Tuple) lrOut.result).get(1);
-
+ IndexedKey indexedKey = new IndexedKey(index,key);
Tuple value = tuple;
// make a (key, value) pair
- Tuple2<Object, Tuple> tuple_KeyValue = new Tuple2<Object,
Tuple>(
- key, value);
+ Tuple2<IndexedKey, Tuple> tuple_KeyValue = new
Tuple2<IndexedKey, Tuple>(
+ indexedKey, value);
return tuple_KeyValue;
@@ -158,23 +159,23 @@ public class SkewedJoinConverter impleme
private static class ToValueFunction
implements
- FlatMapFunction<Iterator<Tuple2<Object, Tuple2<Tuple, Tuple>>>,
Tuple>, Serializable {
+ FlatMapFunction<Iterator<Tuple2<IndexedKey, Tuple2<Tuple,
Tuple>>>, Tuple>, Serializable {
private class Tuple2TransformIterable implements Iterable<Tuple> {
- Iterator<Tuple2<Object, Tuple2<Tuple, Tuple>>> in;
+ Iterator<Tuple2<IndexedKey, Tuple2<Tuple, Tuple>>> in;
Tuple2TransformIterable(
- Iterator<Tuple2<Object, Tuple2<Tuple, Tuple>>> input) {
+ Iterator<Tuple2<IndexedKey, Tuple2<Tuple, Tuple>>> input) {
in = input;
}
public Iterator<Tuple> iterator() {
- return new IteratorTransform<Tuple2<Object, Tuple2<Tuple,
Tuple>>, Tuple>(
+ return new IteratorTransform<Tuple2<IndexedKey, Tuple2<Tuple,
Tuple>>, Tuple>(
in) {
@Override
protected Tuple transform(
- Tuple2<Object, Tuple2<Tuple, Tuple>> next) {
+ Tuple2<IndexedKey, Tuple2<Tuple, Tuple>> next) {
try {
Tuple leftTuple = next._2._1;
@@ -208,7 +209,7 @@ public class SkewedJoinConverter impleme
@Override
public Iterable<Tuple> call(
- Iterator<Tuple2<Object, Tuple2<Tuple, Tuple>>> input) {
+ Iterator<Tuple2<IndexedKey, Tuple2<Tuple, Tuple>>> input) {
return new Tuple2TransformIterable(input);
}
}