Author: xuefu
Date: Wed Sep 17 16:17:34 2014
New Revision: 1625642
URL: http://svn.apache.org/r1625642
Log:
HIVE-8141: Refactor the GraphTran code by moving union handling logic to
UnionTran [Spark Branch] (Na via Xuefu)
Modified:
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/GraphTran.java
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/UnionTran.java
Modified:
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/GraphTran.java
URL:
http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/GraphTran.java?rev=1625642&r1=1625641&r2=1625642&view=diff
==============================================================================
---
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/GraphTran.java
(original)
+++
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/GraphTran.java
Wed Sep 17 16:17:34 2014
@@ -67,29 +67,12 @@ public class GraphTran {
while (getChildren(tran).size() > 0) {
SparkTran childTran = getChildren(tran).get(0);
- if (childTran instanceof UnionTran) {
- List<JavaPairRDD<HiveKey, BytesWritable>> unionInputList =
unionInputs
- .get(childTran);
- if (unionInputList == null) {
- // process the first union input RDD, cache it in the hash map
- unionInputList = new LinkedList<JavaPairRDD<HiveKey,
BytesWritable>>();
- unionInputList.add(rdd);
- unionInputs.put(childTran, unionInputList);
- break;
- } else if (unionInputList.size() < this.getParents(childTran).size()
- 1) {
- // not the last input RDD yet, continue caching it in the hash map
- unionInputList.add(rdd);
- break;
- } else if (unionInputList.size() ==
this.getParents(childTran).size() - 1) { // process
- // process the last input RDD
- for (JavaPairRDD<HiveKey, BytesWritable> inputRDD :
unionInputList) {
- ((UnionTran) childTran).setOtherInput(inputRDD);
- rdd = childTran.transform(rdd);
- }
- }
- } else {
- rdd = childTran.transform(rdd);
+ if (childTran instanceof UnionTran &&
+ this.getParents(childTran).size() >
((UnionTran)childTran).getOtherInputList().size() + 1) {
+ ((UnionTran) childTran).addOtherInput(rdd);
+ break;
}
+ rdd = childTran.transform(rdd);
tran = childTran;
}
// if the current transformation is a leaf tran and it has not got
processed yet, cache its corresponding RDD
Modified:
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/UnionTran.java
URL:
http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/UnionTran.java?rev=1625642&r1=1625641&r2=1625642&view=diff
==============================================================================
---
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/UnionTran.java
(original)
+++
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/UnionTran.java
Wed Sep 17 16:17:34 2014
@@ -18,24 +18,30 @@
package org.apache.hadoop.hive.ql.exec.spark;
+import java.util.LinkedList;
+import java.util.List;
import org.apache.hadoop.hive.ql.io.HiveKey;
import org.apache.hadoop.io.BytesWritable;
import org.apache.spark.api.java.JavaPairRDD;
public class UnionTran implements SparkTran<HiveKey, HiveKey> {
- JavaPairRDD<HiveKey, BytesWritable> otherInput;
+ List<JavaPairRDD<HiveKey, BytesWritable>> otherInputsList = new
LinkedList<JavaPairRDD<HiveKey, BytesWritable>>();
@Override
public JavaPairRDD<HiveKey, BytesWritable> transform(
JavaPairRDD<HiveKey, BytesWritable> input) {
- return input.union(otherInput);
+ JavaPairRDD<HiveKey, BytesWritable> result = input;
+ for (JavaPairRDD<HiveKey, BytesWritable> otherInput : otherInputsList) {
+ result = result.union(otherInput);
+ }
+ return result;
}
- public void setOtherInput(JavaPairRDD<HiveKey, BytesWritable> otherInput) {
- this.otherInput = otherInput;
+ public void addOtherInput(JavaPairRDD<HiveKey, BytesWritable> input) {
+ otherInputsList.add(input);
}
- public JavaPairRDD<HiveKey, BytesWritable> getOtherInput() {
- return this.otherInput;
+ public List<JavaPairRDD<HiveKey, BytesWritable>> getOtherInputList() {
+ return this.otherInputsList;
}
}