Author: xuefu
Date: Fri Jan 30 18:37:12 2015
New Revision: 1656084

URL: http://svn.apache.org/r1656084
Log:
HIVE-9192: One-pass SMB Optimizations [Spark Branch] (Szehon via Xuefu)

Added:
    
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkSMBMapJoinInfo.java
Modified:
    
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SparkSortMergeJoinFactory.java
    
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkProcContext.java
    
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkUtils.java
    
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkWork.java
    
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkCompiler.java

Modified: 
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SparkSortMergeJoinFactory.java
URL: 
http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SparkSortMergeJoinFactory.java?rev=1656084&r1=1656083&r2=1656084&view=diff
==============================================================================
--- 
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SparkSortMergeJoinFactory.java
 (original)
+++ 
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SparkSortMergeJoinFactory.java
 Fri Jan 30 18:37:12 2015
@@ -1,20 +1,20 @@
 /**
-* Licensed to the Apache Software Foundation (ASF) under one
-* or more contributor license agreements.  See the NOTICE file
-* distributed with this work for additional information
-* regarding copyright ownership.  The ASF licenses this file
-* to you under the Apache License, Version 2.0 (the
-* "License"); you may not use this file except in compliance
-* with the License.  You may obtain a copy of the License at
-*
-*     http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
 package org.apache.hadoop.hive.ql.optimizer.spark;
 
 import java.util.List;
@@ -36,8 +36,8 @@ import org.apache.hadoop.hive.ql.plan.Ma
 import org.apache.hadoop.hive.ql.plan.OperatorDesc;
 
 /**
-* Operator factory for Spark SMBJoin processing.
-*/
+ * Operator factory for Spark SMBJoin processing.
+ */
 public final class SparkSortMergeJoinFactory {
 
   private SparkSortMergeJoinFactory() {
@@ -45,131 +45,79 @@ public final class SparkSortMergeJoinFac
   }
 
   /**
-   * Get the branch on which we are invoked (walking) from.  See diagram below.
-   * We are at the SMBJoinOp and could have come from TS of any of the input 
tables.
-   */
-  public static int getPositionParent(SMBMapJoinOperator op,
-      Stack<Node> stack) {
-    int size = stack.size();
-    assert size >= 2 && stack.get(size - 1) == op;
-    @SuppressWarnings("unchecked")
-    Operator<? extends OperatorDesc> parent =
-        (Operator<? extends OperatorDesc>) stack.get(size - 2);
-    List<Operator<? extends OperatorDesc>> parOp = op.getParentOperators();
-    int pos = parOp.indexOf(parent);
-    return pos;
-  }
-
-  /**
-   * SortMergeMapJoin processor, input is a SMBJoinOp that is part of a 
MapWork:
-   *
-   *  MapWork:
-   *
-   *   (Big)   (Small)  (Small)
-   *    TS       TS       TS
-   *     \       |       /
-   *       \     DS     DS
-   *         \   |    /
-   *          SMBJoinOP
+   * Annotate MapWork, input is a SMBJoinOp that is part of a MapWork, and its 
root TS operator.
    *
    * 1. Initializes the MapWork's aliasToWork, pointing to big-table's TS.
    * 2. Adds the bucketing information to the MapWork.
    * 3. Adds localwork to the MapWork, with localWork's aliasToWork pointing 
to small-table's TS.
+   * @param context proc walker context
+   * @param mapWork mapwork to annotate
+   * @param smbMapJoinOp SMB Map Join operator to get data
+   * @param ts Table Scan operator to get data
+   * @param local Whether ts is from a 'local' source (small-table that will 
be loaded by SMBJoin 'local' task)
    */
-  private static class SortMergeJoinProcessor implements NodeProcessor {
+  public static void annotateMapWork(GenSparkProcContext context, MapWork 
mapWork,
+    SMBMapJoinOperator smbMapJoinOp, TableScanOperator ts, boolean local)
+    throws SemanticException {
+    initSMBJoinPlan(context, mapWork, ts, local);
+    setupBucketMapJoinInfo(mapWork, smbMapJoinOp);
+  }
 
-    public static void setupBucketMapJoinInfo(MapWork plan, SMBMapJoinOperator 
currMapJoinOp) {
-      if (currMapJoinOp != null) {
-        Map<String, Map<String, List<String>>> aliasBucketFileNameMapping =
-            currMapJoinOp.getConf().getAliasBucketFileNameMapping();
-        if (aliasBucketFileNameMapping != null) {
-          MapredLocalWork localPlan = plan.getMapRedLocalWork();
-          if (localPlan == null) {
-            localPlan = currMapJoinOp.getConf().getLocalWork();
-          } else {
-            // local plan is not null, we want to merge it into 
SMBMapJoinOperator's local work
-            MapredLocalWork smbLocalWork = 
currMapJoinOp.getConf().getLocalWork();
-            if (smbLocalWork != null) {
-              
localPlan.getAliasToFetchWork().putAll(smbLocalWork.getAliasToFetchWork());
-              localPlan.getAliasToWork().putAll(smbLocalWork.getAliasToWork());
-            }
+  private static void setupBucketMapJoinInfo(MapWork plan, SMBMapJoinOperator 
currMapJoinOp) {
+    if (currMapJoinOp != null) {
+      Map<String, Map<String, List<String>>> aliasBucketFileNameMapping =
+        currMapJoinOp.getConf().getAliasBucketFileNameMapping();
+      if (aliasBucketFileNameMapping != null) {
+        MapredLocalWork localPlan = plan.getMapRedLocalWork();
+        if (localPlan == null) {
+          localPlan = currMapJoinOp.getConf().getLocalWork();
+        } else {
+          // local plan is not null, we want to merge it into 
SMBMapJoinOperator's local work
+          MapredLocalWork smbLocalWork = 
currMapJoinOp.getConf().getLocalWork();
+          if (smbLocalWork != null) {
+            
localPlan.getAliasToFetchWork().putAll(smbLocalWork.getAliasToFetchWork());
+            localPlan.getAliasToWork().putAll(smbLocalWork.getAliasToWork());
           }
+        }
 
-          if (localPlan == null) {
-            return;
-          }
-          plan.setMapRedLocalWork(null);
-          currMapJoinOp.getConf().setLocalWork(localPlan);
+        if (localPlan == null) {
+          return;
+        }
+        plan.setMapRedLocalWork(null);
+        currMapJoinOp.getConf().setLocalWork(localPlan);
 
-          BucketMapJoinContext bucketMJCxt = new BucketMapJoinContext();
-          localPlan.setBucketMapjoinContext(bucketMJCxt);
-          
bucketMJCxt.setAliasBucketFileNameMapping(aliasBucketFileNameMapping);
-          bucketMJCxt.setBucketFileNameMapping(
-              currMapJoinOp.getConf().getBigTableBucketNumMapping());
-          localPlan.setInputFileChangeSensitive(true);
-          
bucketMJCxt.setMapJoinBigTableAlias(currMapJoinOp.getConf().getBigTableAlias());
-          bucketMJCxt
-              
.setBucketMatcherClass(org.apache.hadoop.hive.ql.exec.DefaultBucketMatcher.class);
-          bucketMJCxt.setBigTablePartSpecToFileMapping(
-              currMapJoinOp.getConf().getBigTablePartSpecToFileMapping());
+        BucketMapJoinContext bucketMJCxt = new BucketMapJoinContext();
+        localPlan.setBucketMapjoinContext(bucketMJCxt);
+        bucketMJCxt.setAliasBucketFileNameMapping(aliasBucketFileNameMapping);
+        bucketMJCxt.setBucketFileNameMapping(
+          currMapJoinOp.getConf().getBigTableBucketNumMapping());
+        localPlan.setInputFileChangeSensitive(true);
+        
bucketMJCxt.setMapJoinBigTableAlias(currMapJoinOp.getConf().getBigTableAlias());
+        bucketMJCxt
+          
.setBucketMatcherClass(org.apache.hadoop.hive.ql.exec.DefaultBucketMatcher.class);
+        bucketMJCxt.setBigTablePartSpecToFileMapping(
+          currMapJoinOp.getConf().getBigTablePartSpecToFileMapping());
 
-          plan.setUseBucketizedHiveInputFormat(true);
+        plan.setUseBucketizedHiveInputFormat(true);
 
-        }
       }
     }
+  }
 
-    /**
-     * Initialize the mapWork.
-     *
-     * @param opProcCtx
-     *          processing context
-     */
-    private static void initSMBJoinPlan(MapWork mapWork,
-                                        GenSparkProcContext opProcCtx, boolean 
local)
-            throws SemanticException {
-      TableScanOperator ts = (TableScanOperator) opProcCtx.currentRootOperator;
-      String currAliasId = findAliasId(opProcCtx, ts);
-      GenMapRedUtils.setMapWork(mapWork, opProcCtx.parseContext,
-         opProcCtx.inputs, null, ts, currAliasId, opProcCtx.conf, local);
-    }
+  private static void initSMBJoinPlan(GenSparkProcContext opProcCtx,
+    MapWork mapWork, TableScanOperator currentRootOperator, boolean local)
+    throws SemanticException {
+    String currAliasId = findAliasId(opProcCtx, currentRootOperator);
+    GenMapRedUtils.setMapWork(mapWork, opProcCtx.parseContext,
+      opProcCtx.inputs, null, currentRootOperator, currAliasId, 
opProcCtx.conf, local);
+  }
 
-    private static String findAliasId(GenSparkProcContext opProcCtx, 
TableScanOperator ts) {
-      for (String alias : opProcCtx.topOps.keySet()) {
-        if (opProcCtx.topOps.get(alias) == ts) {
-          return alias;
-        }
+  private static String findAliasId(GenSparkProcContext opProcCtx, 
TableScanOperator ts) {
+    for (String alias : opProcCtx.topOps.keySet()) {
+      if (opProcCtx.topOps.get(alias) == ts) {
+        return alias;
       }
-      return null;
     }
-
-    /**
-     * 1. Initializes the MapWork's aliasToWork, pointing to big-table's TS.
-     * 2. Adds the bucketing information to the MapWork.
-     * 3. Adds localwork to the MapWork, with localWork's aliasToWork pointing 
to small-table's TS.
-     */
-    @Override
-    public Object process(Node nd, Stack<Node> stack, NodeProcessorCtx procCtx,
-        Object... nodeOutputs) throws SemanticException {
-      SMBMapJoinOperator mapJoin = (SMBMapJoinOperator) nd;
-      GenSparkProcContext ctx = (GenSparkProcContext) procCtx;
-
-      // find the branch on which this processor was invoked
-      int pos = getPositionParent(mapJoin, stack);
-      boolean local = pos != mapJoin.getConf().getPosBigTable();
-
-      MapWork mapWork = ctx.smbJoinWorkMap.get(mapJoin);
-      initSMBJoinPlan(mapWork, ctx, local);
-
-      // find the associated mapWork that contains this processor.
-      setupBucketMapJoinInfo(mapWork, mapJoin);
-
-      // local aliases need not to hand over context further
-      return false;
-    }
-  }
-
-  public static NodeProcessor getTableScanMapJoin() {
-    return new SortMergeJoinProcessor();
+    return null;
   }
 }

Modified: 
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkProcContext.java
URL: 
http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkProcContext.java?rev=1656084&r1=1656083&r2=1656084&view=diff
==============================================================================
--- 
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkProcContext.java
 (original)
+++ 
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkProcContext.java
 Fri Jan 30 18:37:12 2015
@@ -36,7 +36,6 @@ import org.apache.hadoop.hive.ql.lib.Nod
 import org.apache.hadoop.hive.ql.parse.ParseContext;
 import org.apache.hadoop.hive.ql.plan.BaseWork;
 import org.apache.hadoop.hive.ql.plan.DependencyCollectionWork;
-import org.apache.hadoop.hive.ql.plan.MapWork;
 import org.apache.hadoop.hive.ql.plan.MoveWork;
 import org.apache.hadoop.hive.ql.plan.OperatorDesc;
 import org.apache.hadoop.hive.ql.plan.ReduceWork;
@@ -44,6 +43,7 @@ import org.apache.hadoop.hive.ql.plan.Sp
 import org.apache.hadoop.hive.ql.plan.SparkWork;
 
 import java.io.Serializable;
+import java.util.HashMap;
 import java.util.LinkedHashMap;
 import java.util.LinkedHashSet;
 import java.util.LinkedList;
@@ -103,8 +103,8 @@ public class GenSparkProcContext impleme
   // map that says which mapjoin belongs to which work item
   public final Map<MapJoinOperator, List<BaseWork>> mapJoinWorkMap;
 
-  // a map to keep track of which MapWork item holds which SMBMapJoinOp
-  public final Map<SMBMapJoinOperator, MapWork> smbJoinWorkMap;
+  // Map to keep track of which SMB Join operators and their information to 
annotate their MapWork with.
+  public final Map<SMBMapJoinOperator, SparkSMBMapJoinInfo> smbMapJoinCtxMap;
 
   // a map to keep track of which root generated which work
   public final Map<Operator<?>, BaseWork> rootToWorkMap;
@@ -160,7 +160,7 @@ public class GenSparkProcContext impleme
         new LinkedHashMap<ReduceSinkOperator, ObjectPair<SparkEdgeProperty, 
ReduceWork>>();
     this.linkOpWithWorkMap = new LinkedHashMap<Operator<?>, Map<BaseWork, 
SparkEdgeProperty>>();
     this.linkWorkWithReduceSinkMap = new LinkedHashMap<BaseWork, 
List<ReduceSinkOperator>>();
-    this.smbJoinWorkMap = new LinkedHashMap<SMBMapJoinOperator, MapWork>();
+    this.smbMapJoinCtxMap = new HashMap<SMBMapJoinOperator, 
SparkSMBMapJoinInfo>();
     this.mapJoinWorkMap = new LinkedHashMap<MapJoinOperator, List<BaseWork>>();
     this.rootToWorkMap = new LinkedHashMap<Operator<?>, BaseWork>();
     this.childToWorkMap = new LinkedHashMap<Operator<?>, List<BaseWork>>();

Modified: 
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkUtils.java
URL: 
http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkUtils.java?rev=1656084&r1=1656083&r2=1656084&view=diff
==============================================================================
--- 
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkUtils.java
 (original)
+++ 
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkUtils.java
 Fri Jan 30 18:37:12 2015
@@ -41,10 +41,12 @@ import org.apache.hadoop.hive.ql.exec.Ha
 import org.apache.hadoop.hive.ql.exec.JoinOperator;
 import org.apache.hadoop.hive.ql.exec.Operator;
 import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator;
+import org.apache.hadoop.hive.ql.exec.SMBMapJoinOperator;
 import org.apache.hadoop.hive.ql.exec.TableScanOperator;
 import org.apache.hadoop.hive.ql.exec.UnionOperator;
 import org.apache.hadoop.hive.ql.exec.Utilities;
 import org.apache.hadoop.hive.ql.optimizer.GenMapRedUtils;
+import org.apache.hadoop.hive.ql.optimizer.spark.SparkSortMergeJoinFactory;
 import org.apache.hadoop.hive.ql.parse.ParseContext;
 import org.apache.hadoop.hive.ql.parse.PrunedPartitionList;
 import org.apache.hadoop.hive.ql.parse.SemanticException;
@@ -443,6 +445,25 @@ public class GenSparkUtils {
     return null;
   }
 
+  /**
+   * Fill MapWork with 'local' work and bucket information for SMB Join.
+   * @param context context, containing references to MapWorks and their SMB 
information.
+   * @throws SemanticException
+   */
+  public void annotateMapWork(GenSparkProcContext context) throws 
SemanticException {
+    for (SMBMapJoinOperator smbMapJoinOp : context.smbMapJoinCtxMap.keySet()) {
+      //initialize mapwork with smbMapJoin information.
+      SparkSMBMapJoinInfo smbMapJoinInfo = 
context.smbMapJoinCtxMap.get(smbMapJoinOp);
+      MapWork work = smbMapJoinInfo.mapWork;
+      SparkSortMergeJoinFactory.annotateMapWork(context, work, smbMapJoinOp,
+        (TableScanOperator) smbMapJoinInfo.bigTableRootOp, false);
+      for (Operator<?> smallTableRootOp : smbMapJoinInfo.smallTableRootOps) {
+        SparkSortMergeJoinFactory.annotateMapWork(context, work, smbMapJoinOp,
+          (TableScanOperator) smallTableRootOp, true);
+      }
+    }
+  }
+
   public synchronized int getNextSeqNumber() {
     return ++sequenceNumber;
   }

Modified: 
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkWork.java
URL: 
http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkWork.java?rev=1656084&r1=1656083&r2=1656084&view=diff
==============================================================================
--- 
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkWork.java
 (original)
+++ 
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkWork.java
 Fri Jan 30 18:37:12 2015
@@ -34,10 +34,12 @@ import org.apache.hadoop.hive.ql.exec.Op
 import org.apache.hadoop.hive.ql.exec.OperatorFactory;
 import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator;
 import org.apache.hadoop.hive.ql.exec.SMBMapJoinOperator;
+import org.apache.hadoop.hive.ql.exec.TableScanOperator;
 import org.apache.hadoop.hive.ql.lib.Node;
 import org.apache.hadoop.hive.ql.lib.NodeProcessor;
 import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx;
 import org.apache.hadoop.hive.ql.optimizer.GenMapRedUtils;
+import org.apache.hadoop.hive.ql.optimizer.spark.SparkSortMergeJoinFactory;
 import org.apache.hadoop.hive.ql.parse.SemanticException;
 import org.apache.hadoop.hive.ql.plan.BaseWork;
 import org.apache.hadoop.hive.ql.plan.MapWork;
@@ -118,18 +120,12 @@ public class GenSparkWork implements Nod
     } else {
       // create a new vertex
       if (context.preceedingWork == null) {
-        if (smbOp != null) {
-          // This logic is for SortMergeBucket MapJoin case.
-          // This MapWork (of big-table, see above..) is later initialized by 
SparkMapJoinFactory
-          // processor, so don't initialize it here. Just keep track of it in 
the context,
-          // for later processing.
-          work = utils.createMapWork(context, root, sparkWork, null, true);
-          if (context.smbJoinWorkMap.get(smbOp) != null) {
-            throw new SemanticException("Each SMBMapJoin should be associated 
only with one Mapwork");
-          }
-          context.smbJoinWorkMap.put(smbOp, (MapWork) work);
-        } else {
+        if (smbOp == null) {
           work = utils.createMapWork(context, root, sparkWork, null);
+        } else {
+          //save work to be initialized later with SMB information.
+          work = utils.createMapWork(context, root, sparkWork, null, true);
+          context.smbMapJoinCtxMap.get(smbOp).mapWork = (MapWork) work;
         }
       } else {
         work = utils.createReduceWork(context, root, sparkWork);

Modified: 
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkCompiler.java
URL: 
http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkCompiler.java?rev=1656084&r1=1656083&r2=1656084&view=diff
==============================================================================
--- 
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkCompiler.java
 (original)
+++ 
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkCompiler.java
 Fri Jan 30 18:37:12 2015
@@ -186,17 +186,30 @@ public class SparkCompiler extends TaskC
      *
      * Some of the other processors are expecting only one traversal beyond 
SMBJoinOp.
      * We need to traverse from the big-table path only, and stop traversing 
on the small-table path once we reach SMBJoinOp.
+     * Also add some SMB join information to the context, so we can properly 
annotate the MapWork later on.
      */
     opRules.put(new TypeRule(SMBMapJoinOperator.class),
       new NodeProcessor() {
         @Override
         public Object process(Node currNode, Stack<Node> stack,
                               NodeProcessorCtx procCtx, Object... os) throws 
SemanticException {
+          GenSparkProcContext context = (GenSparkProcContext) procCtx;
+          SMBMapJoinOperator currSmbNode = (SMBMapJoinOperator) currNode;
+          SparkSMBMapJoinInfo smbMapJoinCtx = 
context.smbMapJoinCtxMap.get(currSmbNode);
+          if (smbMapJoinCtx == null) {
+            smbMapJoinCtx = new SparkSMBMapJoinInfo();
+            context.smbMapJoinCtxMap.put(currSmbNode, smbMapJoinCtx);
+          }
+
           for (Node stackNode : stack) {
             if (stackNode instanceof DummyStoreOperator) {
+              //If coming from small-table side, do some book-keeping, and 
skip traversal.
+              smbMapJoinCtx.smallTableRootOps.add(context.currentRootOperator);
               return true;
             }
           }
+          //If coming from big-table side, do some book-keeping, and continue 
traversal
+          smbMapJoinCtx.bigTableRootOp = context.currentRootOperator;
           return false;
         }
       }
@@ -210,24 +223,14 @@ public class SparkCompiler extends TaskC
     GraphWalker ogw = new GenSparkWorkWalker(disp, procCtx);
     ogw.startWalking(topNodes, null);
 
-
-    // ------------------- Second Pass -----------------------
-    // SMB Join optimizations to add the "localWork" and bucketing data 
structures to MapWork.
-    opRules.clear();
-    opRules.put(new TypeRule(SMBMapJoinOperator.class),
-       SparkSortMergeJoinFactory.getTableScanMapJoin());
-
-    disp = new DefaultRuleDispatcher(null, opRules, procCtx);
-    topNodes = new ArrayList<Node>();
-    topNodes.addAll(pCtx.getTopOps().values());
-    ogw = new GenSparkWorkWalker(disp, procCtx);
-    ogw.startWalking(topNodes, null);
-
     // we need to clone some operator plans and remove union operators still
     for (BaseWork w: procCtx.workWithUnionOperators) {
       GenSparkUtils.getUtils().removeUnionOperators(conf, procCtx, w);
     }
 
+    // we need to fill MapWork with 'local' work and bucket information for 
SMB Join.
+    GenSparkUtils.getUtils().annotateMapWork(procCtx);
+
     // finally make sure the file sink operators are set up right
     for (FileSinkOperator fileSink: procCtx.fileSinkSet) {
       GenSparkUtils.getUtils().processFileSink(procCtx, fileSink);

Added: 
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkSMBMapJoinInfo.java
URL: 
http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkSMBMapJoinInfo.java?rev=1656084&view=auto
==============================================================================
--- 
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkSMBMapJoinInfo.java
 (added)
+++ 
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkSMBMapJoinInfo.java
 Fri Jan 30 18:37:12 2015
@@ -0,0 +1,33 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.parse.spark;
+
+import org.apache.hadoop.hive.ql.exec.Operator;
+import org.apache.hadoop.hive.ql.plan.MapWork;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Data structure to keep track of SMBMapJoin operators during query 
compilation for Spark.
+ */
+public class SparkSMBMapJoinInfo {
+  Operator<?> bigTableRootOp;
+  List<Operator<?>> smallTableRootOps = new ArrayList<Operator<?>>();
+  MapWork mapWork;
+}


Reply via email to