Author: xuefu
Date: Wed Sep 17 16:17:34 2014
New Revision: 1625642

URL: http://svn.apache.org/r1625642
Log:
HIVE-8141: Refactor the GraphTran code by moving union handling logic to 
UnionTran [Spark Branch] (Na via Xuefu)

Modified:
    
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/GraphTran.java
    
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/UnionTran.java

Modified: 
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/GraphTran.java
URL: 
http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/GraphTran.java?rev=1625642&r1=1625641&r2=1625642&view=diff
==============================================================================
--- 
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/GraphTran.java
 (original)
+++ 
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/GraphTran.java
 Wed Sep 17 16:17:34 2014
@@ -67,29 +67,12 @@ public class GraphTran {
 
       while (getChildren(tran).size() > 0) {
         SparkTran childTran = getChildren(tran).get(0);
-        if (childTran instanceof UnionTran) {
-          List<JavaPairRDD<HiveKey, BytesWritable>> unionInputList = 
unionInputs
-              .get(childTran);
-          if (unionInputList == null) {
-            // process the first union input RDD, cache it in the hash map
-            unionInputList = new LinkedList<JavaPairRDD<HiveKey, 
BytesWritable>>();
-            unionInputList.add(rdd);
-            unionInputs.put(childTran, unionInputList);
-            break;
-          } else if (unionInputList.size() < this.getParents(childTran).size() 
- 1) {
-            // not the last input RDD yet, continue caching it in the hash map
-            unionInputList.add(rdd);
-            break;
-          } else if (unionInputList.size() == 
this.getParents(childTran).size() - 1) { // process
-            // process the last input RDD
-            for (JavaPairRDD<HiveKey, BytesWritable> inputRDD : 
unionInputList) {
-              ((UnionTran) childTran).setOtherInput(inputRDD);
-              rdd = childTran.transform(rdd);
-            }
-          }
-        } else {
-          rdd = childTran.transform(rdd);
+        if (childTran instanceof UnionTran &&
+            this.getParents(childTran).size() > 
((UnionTran)childTran).getOtherInputList().size() + 1) {
+          ((UnionTran) childTran).addOtherInput(rdd);
+          break;
         }
+        rdd = childTran.transform(rdd);
         tran = childTran;
       }
       // if the current transformation is a leaf tran and it has not got 
processed yet, cache its corresponding RDD 

Modified: 
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/UnionTran.java
URL: 
http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/UnionTran.java?rev=1625642&r1=1625641&r2=1625642&view=diff
==============================================================================
--- 
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/UnionTran.java
 (original)
+++ 
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/UnionTran.java
 Wed Sep 17 16:17:34 2014
@@ -18,24 +18,30 @@
 
 package org.apache.hadoop.hive.ql.exec.spark;
 
+import java.util.LinkedList;
+import java.util.List;
 import org.apache.hadoop.hive.ql.io.HiveKey;
 import org.apache.hadoop.io.BytesWritable;
 import org.apache.spark.api.java.JavaPairRDD;
 
 public class UnionTran implements SparkTran<HiveKey, HiveKey> {
-  JavaPairRDD<HiveKey, BytesWritable> otherInput;
+  List<JavaPairRDD<HiveKey, BytesWritable>> otherInputsList = new 
LinkedList<JavaPairRDD<HiveKey, BytesWritable>>();
 
   @Override
   public JavaPairRDD<HiveKey, BytesWritable> transform(
       JavaPairRDD<HiveKey, BytesWritable> input) {
-    return input.union(otherInput);
+    JavaPairRDD<HiveKey, BytesWritable> result = input;
+    for (JavaPairRDD<HiveKey, BytesWritable> otherInput : otherInputsList) {
+      result = result.union(otherInput);
+    }
+    return result;
   }
 
-  public void setOtherInput(JavaPairRDD<HiveKey, BytesWritable> otherInput) {
-    this.otherInput = otherInput;
+  public void addOtherInput(JavaPairRDD<HiveKey, BytesWritable> input) {
+    otherInputsList.add(input);
   }
 
-  public JavaPairRDD<HiveKey, BytesWritable> getOtherInput() {
-    return this.otherInput;
+  public List<JavaPairRDD<HiveKey, BytesWritable>> getOtherInputList() {
+    return this.otherInputsList;
   }
 }


Reply via email to