Author: xuefu
Date: Thu Jul 31 08:20:19 2014
New Revision: 1614825
URL: http://svn.apache.org/r1614825
Log:
HIVE-7526: Research to use groupby transformation to replace Hive existing
partitionByKey and SparkCollector combination [Spark Branch]
Modified:
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveMapFunction.java
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveReduceFunction.java
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/ReduceTran.java
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlanGenerator.java
Modified:
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveMapFunction.java
URL:
http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveMapFunction.java?rev=1614825&r1=1614824&r2=1614825&view=diff
==============================================================================
---
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveMapFunction.java
(original)
+++
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveMapFunction.java
Thu Jul 31 08:20:19 2014
@@ -55,7 +55,6 @@ BytesWritable, BytesWritable> {
collector.clear();
while(it.hasNext() && !ExecMapper.getDone()) {
Tuple2<BytesWritable, BytesWritable> input = it.next();
- System.out.println("mapper input: " + input._1() + ", " + input._2());
mapper.map(input._1(), input._2(), collector, Reporter.NULL);
}
Modified:
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveReduceFunction.java
URL:
http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveReduceFunction.java?rev=1614825&r1=1614824&r2=1614825&view=diff
==============================================================================
---
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveReduceFunction.java
(original)
+++
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveReduceFunction.java
Thu Jul 31 08:20:19 2014
@@ -18,11 +18,7 @@
package org.apache.hadoop.hive.ql.exec.spark;
-import java.util.ArrayList;
-import java.util.HashMap;
import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
import org.apache.hadoop.hive.ql.exec.mr.ExecReducer;
import org.apache.hadoop.io.BytesWritable;
@@ -32,7 +28,7 @@ import org.apache.spark.api.java.functio
import scala.Tuple2;
-public class HiveReduceFunction implements
PairFlatMapFunction<Iterator<Tuple2<BytesWritable,BytesWritable>>,
+public class HiveReduceFunction implements
PairFlatMapFunction<Iterator<Tuple2<BytesWritable,Iterable<BytesWritable>>>,
BytesWritable, BytesWritable> {
private static final long serialVersionUID = 1L;
@@ -47,8 +43,8 @@ BytesWritable, BytesWritable> {
}
@Override
- public Iterable<Tuple2<BytesWritable, BytesWritable>>
call(Iterator<Tuple2<BytesWritable,BytesWritable>> it)
- throws Exception {
+ public Iterable<Tuple2<BytesWritable, BytesWritable>>
+ call(Iterator<Tuple2<BytesWritable,Iterable<BytesWritable>>> it) throws
Exception {
if (jobConf == null) {
jobConf = KryoSerializer.deserializeJobConf(this.buffer);
jobConf.set("mapred.reducer.class", ExecReducer.class.getName());
@@ -59,25 +55,9 @@ BytesWritable, BytesWritable> {
}
collector.clear();
- Map<BytesWritable, List<BytesWritable>> clusteredRows =
- new HashMap<BytesWritable, List<BytesWritable>>();
while (it.hasNext()) {
- Tuple2<BytesWritable, BytesWritable> input = it.next();
- BytesWritable key = input._1();
- BytesWritable value = input._2();
- System.out.println("reducer row: " + key + "/" + value);
- // cluster the input according to key.
- List<BytesWritable> valueList = clusteredRows.get(key);
- if (valueList == null) {
- valueList = new ArrayList<BytesWritable>();
- clusteredRows.put(key, valueList);
- }
- valueList.add(value);
- }
-
- for (Map.Entry<BytesWritable, List<BytesWritable>> entry :
clusteredRows.entrySet()) {
- // pass on the clustered result to the reducer operator tree.
- reducer.reduce(entry.getKey(), entry.getValue().iterator(), collector,
Reporter.NULL);
+ Tuple2<BytesWritable, Iterable<BytesWritable>> tup = it.next();
+ reducer.reduce(tup._1(), tup._2().iterator(), collector, Reporter.NULL);
}
reducer.close();
Modified:
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/ReduceTran.java
URL:
http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/ReduceTran.java?rev=1614825&r1=1614824&r2=1614825&view=diff
==============================================================================
---
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/ReduceTran.java
(original)
+++
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/ReduceTran.java
Thu Jul 31 08:20:19 2014
@@ -22,16 +22,21 @@ import org.apache.hadoop.io.BytesWritabl
import org.apache.spark.api.java.JavaPairRDD;
public class ReduceTran implements SparkTran {
+ private SparkShuffler shuffler;
private HiveReduceFunction reduceFunc;
@Override
public JavaPairRDD<BytesWritable, BytesWritable> transform(
JavaPairRDD<BytesWritable, BytesWritable> input) {
- return input.mapPartitionsToPair(reduceFunc);
+ return shuffler.shuffle(input).mapPartitionsToPair(reduceFunc);
}
public void setReduceFunction(HiveReduceFunction redFunc) {
this.reduceFunc = redFunc;
}
+ public void setShuffler(SparkShuffler shuffler) {
+ this.shuffler = shuffler;
+ }
+
}
Modified:
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlanGenerator.java
URL:
http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlanGenerator.java?rev=1614825&r1=1614824&r2=1614825&view=diff
==============================================================================
---
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlanGenerator.java
(original)
+++
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlanGenerator.java
Thu Jul 31 08:20:19 2014
@@ -62,10 +62,12 @@ public class SparkPlanGenerator {
MapWork mapWork = (MapWork) w;
trans.add(generate(w));
while (sparkWork.getChildren(w).size() > 0) {
- BaseWork child = sparkWork.getChildren(w).get(0);
+ ReduceWork child = (ReduceWork) sparkWork.getChildren(w).get(0);
SparkEdgeProperty edge = sparkWork.getEdgeProperty(w, child);
- trans.add(generate(edge));
- trans.add(generate(child));
+ SparkShuffler st = generate(edge);
+ ReduceTran rt = generate(child);
+ rt.setShuffler(st);
+ trans.add(rt);
w = child;
}
ChainedTran chainedTran = new ChainedTran(trans);
@@ -107,9 +109,9 @@ public class SparkPlanGenerator {
return result;
}
- private ShuffleTran generate(SparkEdgeProperty edge) {
- // TODO: based on edge type, create groupBy or sortBy transformations.
- return new ShuffleTran();
+ private SparkShuffler generate(SparkEdgeProperty edge) {
+ // TODO: create different shuffler based on edge prop.
+ return new GroupByShuffler();
}
private ReduceTran generate(ReduceWork rw) throws IOException {