Author: gunther
Date: Mon Dec 9 03:00:13 2013
New Revision: 1549374
URL: http://svn.apache.org/r1549374
Log:
HIVE-5984: Multi insert statement fails on Tez (Gunther Hagleitner)
Added:
hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezWorkWalker.java
Modified:
hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/lib/DefaultGraphWalker.java
hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezProcContext.java
hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezWork.java
hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/parse/TezCompiler.java
Modified:
hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/lib/DefaultGraphWalker.java
URL:
http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/lib/DefaultGraphWalker.java?rev=1549374&r1=1549373&r2=1549374&view=diff
==============================================================================
---
hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/lib/DefaultGraphWalker.java
(original)
+++
hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/lib/DefaultGraphWalker.java
Mon Dec 9 03:00:13 2013
@@ -36,9 +36,9 @@ import org.apache.hadoop.hive.ql.parse.S
public class DefaultGraphWalker implements GraphWalker {
protected Stack<Node> opStack;
- private final List<Node> toWalk = new ArrayList<Node>();
- private final HashMap<Node, Object> retMap = new HashMap<Node, Object>();
- private final Dispatcher dispatcher;
+ protected final List<Node> toWalk = new ArrayList<Node>();
+ protected final HashMap<Node, Object> retMap = new HashMap<Node, Object>();
+ protected final Dispatcher dispatcher;
/**
* Constructor.
Modified:
hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezProcContext.java
URL:
http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezProcContext.java?rev=1549374&r1=1549373&r2=1549374&view=diff
==============================================================================
---
hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezProcContext.java
(original)
+++
hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezProcContext.java
Mon Dec 9 03:00:13 2013
@@ -19,18 +19,16 @@
package org.apache.hadoop.hive.ql.parse;
import java.io.Serializable;
-import java.util.Deque;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
-import java.util.Stack;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.ql.exec.DependencyCollectionTask;
import org.apache.hadoop.hive.ql.exec.MapJoinOperator;
-import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator;
import org.apache.hadoop.hive.ql.exec.Operator;
+import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator;
import org.apache.hadoop.hive.ql.exec.Task;
import org.apache.hadoop.hive.ql.exec.TaskFactory;
import org.apache.hadoop.hive.ql.exec.tez.TezTask;
@@ -61,10 +59,6 @@ public class GenTezProcContext implement
public final Set<ReadEntity> inputs;
public final Set<WriteEntity> outputs;
- // rootOperators are all the table scan operators in sequence
- // of traversal
- public final Deque<Operator<? extends OperatorDesc>> rootOperators;
-
// holds the root of the operator tree we're currently processing
// this could be a table scan, but also a join, ptf, etc (i.e.:
// first operator of a reduce task.
@@ -98,6 +92,9 @@ public class GenTezProcContext implement
// a map that maintains operator (file-sink or reduce-sink) to work mapping
public final Map<Operator<?>, BaseWork> operatorWorkMap;
+ // a map to keep track of which root generated which work
+ public final Map<Operator<?>, BaseWork> rootToWorkMap;
+
// we need to keep the original list of operators in the map join to know
// what position in the mapjoin the different parent work items will have.
public final Map<MapJoinOperator, List<Operator<?>>> mapJoinParentMap;
@@ -108,19 +105,10 @@ public class GenTezProcContext implement
// used to group dependent tasks for multi table inserts
public final DependencyCollectionTask dependencyTask;
- // root of last multi child operator encountered
- public Stack<Operator<?>> lastRootOfMultiChildOperator;
-
- // branches of current multi-child operator
- public Stack<Integer> currentBranchCount;
-
- // work generated for last multi-child operator
- public Stack<BaseWork> lastWorkForMultiChildOperator;
-
@SuppressWarnings("unchecked")
public GenTezProcContext(HiveConf conf, ParseContext parseContext,
List<Task<MoveWork>> moveTask, List<Task<? extends Serializable>>
rootTasks,
- Set<ReadEntity> inputs, Set<WriteEntity> outputs, Deque<Operator<?>>
rootOperators) {
+ Set<ReadEntity> inputs, Set<WriteEntity> outputs) {
this.conf = conf;
this.parseContext = parseContext;
@@ -130,16 +118,15 @@ public class GenTezProcContext implement
this.outputs = outputs;
this.currentTask = (TezTask) TaskFactory.get(new TezWork(), conf);
this.leafOperatorToFollowingWork = new HashMap<Operator<?>, BaseWork>();
- this.rootOperators = rootOperators;
this.linkOpWithWorkMap = new HashMap<Operator<?>, List<BaseWork>>();
this.linkWorkWithReduceSinkMap = new HashMap<BaseWork,
List<ReduceSinkOperator>>();
this.operatorWorkMap = new HashMap<Operator<?>, BaseWork>();
+ this.rootToWorkMap = new HashMap<Operator<?>, BaseWork>();
this.mapJoinParentMap = new HashMap<MapJoinOperator, List<Operator<?>>>();
this.linkChildOpWithDummyOp = new HashMap<Operator<?>,
List<Operator<?>>>();
this.dependencyTask = (DependencyCollectionTask)
TaskFactory.get(new DependencyCollectionWork(), conf);
- this.lastRootOfMultiChildOperator = new Stack<Operator<?>>();
- this.currentBranchCount = new Stack<Integer>();
- this.lastWorkForMultiChildOperator = new Stack<BaseWork>();
+
+ rootTasks.add(currentTask);
}
}
Modified:
hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezWork.java
URL:
http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezWork.java?rev=1549374&r1=1549373&r2=1549374&view=diff
==============================================================================
---
hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezWork.java
(original)
+++
hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezWork.java
Mon Dec 9 03:00:13 2013
@@ -48,9 +48,9 @@ public class GenTezWork implements NodeP
static final private Log LOG = LogFactory.getLog(GenTezWork.class.getName());
+ // sequence number is used to name vertices (e.g.: Map 1, Reduce 14, ...)
private int sequenceNumber = 0;
- @SuppressWarnings("unchecked")
@Override
public Object process(Node nd, Stack<Node> stack,
NodeProcessorCtx procContext, Object... nodeOutputs)
@@ -62,130 +62,39 @@ public class GenTezWork implements NodeP
// a new vertex.
Operator<?> operator = (Operator<?>) nd;
- TezWork tezWork = context.currentTask.getWork();
- if (!context.rootTasks.contains(context.currentTask)) {
- context.rootTasks.add(context.currentTask);
- }
-
// root is the start of the operator pipeline we're currently
// packing into a vertex, typically a table scan, union or join
Operator<?> root = context.currentRootOperator;
- if (root == null) {
- // null means that we're starting with a new table scan
- // the graph walker walks the rootOperators in the same
- // order so we can just take the next
- context.preceedingWork = null;
-
- // if there are branches remaining we can't pop the next
- // root operator yet.
- if (context.currentBranchCount.isEmpty()
- || (!context.lastWorkForMultiChildOperator.isEmpty()
- && context.lastWorkForMultiChildOperator.peek() == null)) {
- root = context.rootOperators.pop();
- }
- }
LOG.debug("Root operator: " + root);
LOG.debug("Leaf operator: " + operator);
+ TezWork tezWork = context.currentTask.getWork();
+
// Right now the work graph is pretty simple. If there is no
// Preceding work we have a root and will generate a map
// vertex. If there is a preceding work we will generate
// a reduce vertex
BaseWork work;
- if (context.preceedingWork == null) {
- if (root == null) {
- // this is the multi-insert case. we need to reuse the last
- // table scan work.
- root = context.lastRootOfMultiChildOperator.peek();
- work = context.lastWorkForMultiChildOperator.peek();
- LOG.debug("Visiting additional branch in: "+root);
-
- } else {
- assert root.getParentOperators().isEmpty();
- MapWork mapWork = new MapWork("Map "+ (++sequenceNumber));
- LOG.debug("Adding map work (" + mapWork.getName() + ") for " + root);
-
- // map work starts with table scan operators
- assert root instanceof TableScanOperator;
- String alias = ((TableScanOperator)root).getConf().getAlias();
-
- GenMapRedUtils.setMapWork(mapWork, context.parseContext,
- context.inputs, null, root, alias, context.conf, false);
- tezWork.add(mapWork);
- work = mapWork;
-
- // remember this table scan and work item. this is needed for multiple
- // insert statements where multiple operator pipelines hang of a single
- // table scan
- if (!context.lastWorkForMultiChildOperator.isEmpty()
- && context.lastWorkForMultiChildOperator.peek() == null) {
- LOG.debug("Capturing current work for 'multiple branches' case");
- context.lastWorkForMultiChildOperator.pop();
- context.lastWorkForMultiChildOperator.push(work);
- }
- }
-
- if (!context.currentBranchCount.isEmpty()) {
- // we've handled one branch. Adjust the counts.
- int branches = context.currentBranchCount.pop();
- if (--branches != 0) {
- LOG.debug("Remaining branches: "+branches);
- context.currentBranchCount.push(branches);
- } else {
- LOG.debug("No more remaining branches.");
- context.lastRootOfMultiChildOperator.pop();
- context.lastWorkForMultiChildOperator.pop();
- }
- }
-
+ if (context.rootToWorkMap.containsKey(root)) {
+ // having seen the root operator before means there was a branch in the
+ // operator graph. There's typically two reasons for that: a) mux/demux
+ // b) multi insert. Mux/Demux will hit the same leaf again, multi insert
+ // will result into a vertex with multiple FS or RS operators.
+
+ // At this point we don't have to do anything special in this case. Just
+ // run through the regular paces w/o creating a new task.
+ work = context.rootToWorkMap.get(root);
} else {
- assert !root.getParentOperators().isEmpty();
- ReduceWork reduceWork = new ReduceWork("Reducer "+ (++sequenceNumber));
- LOG.debug("Adding reduce work (" + reduceWork.getName() + ") for " +
root);
- reduceWork.setReducer(root);
- reduceWork.setNeedsTagging(GenMapRedUtils.needsTagging(reduceWork));
-
- // All parents should be reduce sinks. We pick the one we just walked
- // to choose the number of reducers. In the join/union case they will
- // all be -1. In sort/order case where it matters there will be only
- // one parent.
- assert context.parentOfRoot instanceof ReduceSinkOperator;
- ReduceSinkOperator reduceSink = (ReduceSinkOperator)
context.parentOfRoot;
-
- LOG.debug("Setting up reduce sink: " + reduceSink);
-
- reduceWork.setNumReduceTasks(reduceSink.getConf().getNumReducers());
-
- // need to fill in information about the key and value in the reducer
- GenMapRedUtils.setKeyAndValueDesc(reduceWork, reduceSink);
-
- // remember which parent belongs to which tag
- reduceWork.getTagToInput().put(reduceSink.getConf().getTag(),
- context.preceedingWork.getName());
-
- // remember the output name of the reduce sink
- reduceSink.getConf().setOutputName(reduceWork.getName());
-
- tezWork.add(reduceWork);
- tezWork.connect(
- context.preceedingWork,
- reduceWork, EdgeType.SIMPLE_EDGE);
-
- work = reduceWork;
-
- // remember this work item. this is needed for multiple
- // insert statements where multiple operator pipelines hang of a forward
- // operator
- if (!context.lastWorkForMultiChildOperator.isEmpty()
- && context.lastWorkForMultiChildOperator.peek() == null) {
- LOG.debug("Capturing current work for 'multiple branches' case");
- context.lastWorkForMultiChildOperator.pop();
- context.lastWorkForMultiChildOperator.push(work);
+ // create a new vertex
+ if (context.preceedingWork == null) {
+ work = createMapWork(context, root, tezWork);
+ } else {
+ work = createReduceWork(context, root, tezWork);
}
+ context.rootToWorkMap.put(root, work);
}
- // We're scanning the operator from table scan to final file sink.
// We're scanning a tree from roots to leaf (this is not technically
// correct, demux and mux operators might form a diamond shape, but
// we will only scan one path and ignore the others, because the
@@ -233,20 +142,15 @@ public class GenTezWork implements NodeP
context.parentOfRoot = operator;
context.currentRootOperator = operator.getChildOperators().get(0);
context.preceedingWork = work;
- } else {
- LOG.debug("Leaf operator - resetting context: " +
context.currentRootOperator);
- context.parentOfRoot = null;
- context.currentRootOperator = null;
- context.preceedingWork = null;
}
/*
* this happens in case of map join operations.
* The tree looks like this:
*
- * RS <--- we are here perhaps
- * |
- * MapJoin
+ * RS <--- we are here perhaps
+ * |
+ * MapJoin
* / \
* RS TS
* /
@@ -266,10 +170,10 @@ public class GenTezWork implements NodeP
}
for (BaseWork parentWork : linkWorkList) {
tezWork.connect(parentWork, work, EdgeType.BROADCAST_EDGE);
-
+
// need to set up output name for reduce sink not that we know the name
// of the downstream work
- for (ReduceSinkOperator r:
+ for (ReduceSinkOperator r:
context.linkWorkWithReduceSinkMap.get(parentWork)) {
r.getConf().setOutputName(work.getName());
}
@@ -279,4 +183,65 @@ public class GenTezWork implements NodeP
return null;
}
+ private ReduceWork createReduceWork(GenTezProcContext context, Operator<?>
root,
+ TezWork tezWork) {
+ assert !root.getParentOperators().isEmpty();
+ ReduceWork reduceWork = new ReduceWork("Reducer "+ (++sequenceNumber));
+ LOG.debug("Adding reduce work (" + reduceWork.getName() + ") for " + root);
+ reduceWork.setReducer(root);
+ reduceWork.setNeedsTagging(GenMapRedUtils.needsTagging(reduceWork));
+
+ // All parents should be reduce sinks. We pick the one we just walked
+ // to choose the number of reducers. In the join/union case they will
+ // all be -1. In sort/order case where it matters there will be only
+ // one parent.
+ assert context.parentOfRoot instanceof ReduceSinkOperator;
+ ReduceSinkOperator reduceSink = (ReduceSinkOperator) context.parentOfRoot;
+
+ reduceWork.setNumReduceTasks(reduceSink.getConf().getNumReducers());
+
+ setupReduceSink(context, reduceWork, reduceSink);
+
+ tezWork.add(reduceWork);
+ tezWork.connect(
+ context.preceedingWork,
+ reduceWork, EdgeType.SIMPLE_EDGE);
+
+ return reduceWork;
+ }
+
+ private void setupReduceSink(GenTezProcContext context, ReduceWork
reduceWork,
+ ReduceSinkOperator reduceSink) {
+
+ LOG.debug("Setting up reduce sink: " + reduceSink
+ + " with following reduce work: " + reduceWork.getName());
+
+ // need to fill in information about the key and value in the reducer
+ GenMapRedUtils.setKeyAndValueDesc(reduceWork, reduceSink);
+
+ // remember which parent belongs to which tag
+ reduceWork.getTagToInput().put(reduceSink.getConf().getTag(),
+ context.preceedingWork.getName());
+
+ // remember the output name of the reduce sink
+ reduceSink.getConf().setOutputName(reduceWork.getName());
+ }
+
+ private MapWork createMapWork(GenTezProcContext context, Operator<?> root,
+ TezWork tezWork) throws SemanticException {
+ assert root.getParentOperators().isEmpty();
+ MapWork mapWork = new MapWork("Map "+ (++sequenceNumber));
+ LOG.debug("Adding map work (" + mapWork.getName() + ") for " + root);
+
+ // map work starts with table scan operators
+ assert root instanceof TableScanOperator;
+ String alias = ((TableScanOperator)root).getConf().getAlias();
+
+ GenMapRedUtils.setMapWork(mapWork, context.parseContext,
+ context.inputs, null, root, alias, context.conf, false);
+ tezWork.add(mapWork);
+
+ return mapWork;
+ }
+
}
Added:
hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezWorkWalker.java
URL:
http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezWorkWalker.java?rev=1549374&view=auto
==============================================================================
---
hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezWorkWalker.java
(added)
+++
hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezWorkWalker.java
Mon Dec 9 03:00:13 2013
@@ -0,0 +1,110 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.parse;
+
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+
+import org.apache.hadoop.hive.ql.exec.Operator;
+import org.apache.hadoop.hive.ql.lib.DefaultGraphWalker;
+import org.apache.hadoop.hive.ql.lib.Dispatcher;
+import org.apache.hadoop.hive.ql.lib.Node;
+import org.apache.hadoop.hive.ql.plan.BaseWork;
+import org.apache.hadoop.hive.ql.plan.OperatorDesc;
+
+/**
+ * Walks the operator tree in DFS fashion.
+ */
+public class GenTezWorkWalker extends DefaultGraphWalker {
+
+ private final GenTezProcContext ctx;
+
+ /**
+ * constructor of the walker - the dispatcher is passed.
+ *
+ * @param disp the dispatcher to be called for each node visited
+ * @param ctx the context where we'll set the current root operator
+ *
+ */
+ public GenTezWorkWalker(Dispatcher disp, GenTezProcContext ctx) {
+ super(disp);
+ this.ctx = ctx;
+ }
+
+ private void setRoot(Node nd) {
+ ctx.currentRootOperator = (Operator<? extends OperatorDesc>) nd;
+ ctx.preceedingWork = null;
+ ctx.parentOfRoot = null;
+ }
+
+ /**
+ * starting point for walking.
+ *
+ * @throws SemanticException
+ */
+ @Override
+ public void startWalking(Collection<Node> startNodes,
+ HashMap<Node, Object> nodeOutput) throws SemanticException {
+ toWalk.addAll(startNodes);
+ while (toWalk.size() > 0) {
+ Node nd = toWalk.remove(0);
+ setRoot(nd);
+ walk(nd);
+ if (nodeOutput != null) {
+ nodeOutput.put(nd, retMap.get(nd));
+ }
+ }
+ }
+
+ /**
+ * Walk the given operator.
+ *
+ * @param nd operator being walked
+ */
+ @Override
+ public void walk(Node nd) throws SemanticException {
+ List<? extends Node> children = nd.getChildren();
+
+ // maintain the stack of operators encountered
+ opStack.push(nd);
+ Boolean skip = dispatchAndReturn(nd, opStack);
+
+ // save some positional state
+ Operator<? extends OperatorDesc> currentRoot = ctx.currentRootOperator;
+ Operator<? extends OperatorDesc> parentOfRoot = ctx.parentOfRoot;
+ BaseWork preceedingWork = ctx.preceedingWork;
+
+ if (skip == null || !skip) {
+ // move all the children to the front of queue
+ for (Node ch : children) {
+
+ // and restore the state before walking each child
+ ctx.currentRootOperator = currentRoot;
+ ctx.parentOfRoot = parentOfRoot;
+ ctx.preceedingWork = preceedingWork;
+
+ walk(ch);
+ }
+ }
+
+ // done with this operator
+ opStack.pop();
+ }
+}
Modified:
hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/parse/TezCompiler.java
URL:
http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/parse/TezCompiler.java?rev=1549374&r1=1549373&r2=1549374&view=diff
==============================================================================
---
hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/parse/TezCompiler.java
(original)
+++
hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/parse/TezCompiler.java
Mon Dec 9 03:00:13 2013
@@ -34,12 +34,10 @@ import org.apache.hadoop.hive.conf.HiveC
import org.apache.hadoop.hive.ql.Context;
import org.apache.hadoop.hive.ql.exec.ConditionalTask;
import org.apache.hadoop.hive.ql.exec.FileSinkOperator;
-import org.apache.hadoop.hive.ql.exec.ForwardOperator;
import org.apache.hadoop.hive.ql.exec.JoinOperator;
import org.apache.hadoop.hive.ql.exec.MapJoinOperator;
import org.apache.hadoop.hive.ql.exec.Operator;
import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator;
-import org.apache.hadoop.hive.ql.exec.TableScanOperator;
import org.apache.hadoop.hive.ql.exec.Task;
import org.apache.hadoop.hive.ql.exec.UnionOperator;
import org.apache.hadoop.hive.ql.exec.tez.TezTask;
@@ -113,12 +111,8 @@ public class TezCompiler extends TaskCom
ParseContext tempParseContext = getParseContext(pCtx, rootTasks);
GenTezWork genTezWork = new GenTezWork();
- // Sequence of TableScan operators to be walked
- Deque<Operator<?>> deque = new LinkedList<Operator<?>>();
- deque.addAll(pCtx.getTopOps().values());
-
GenTezProcContext procCtx = new GenTezProcContext(
- conf, tempParseContext, mvTask, rootTasks, inputs, outputs, deque);
+ conf, tempParseContext, mvTask, rootTasks, inputs, outputs);
// create a walker which walks the tree in a DFS manner while maintaining
// the operator stack.
@@ -147,46 +141,12 @@ public class TezCompiler extends TaskCom
}
});
- opRules.put(new RuleRegExp("Setup table scan",
- TableScanOperator.getOperatorName() + "%"), new NodeProcessor()
- {
- @Override
- public Object process(Node n, Stack<Node> s,
- NodeProcessorCtx procCtx, Object... os) throws SemanticException {
- GenTezProcContext context = (GenTezProcContext) procCtx;
- TableScanOperator tableScan = (TableScanOperator) n;
- LOG.debug("TableScan operator ("+tableScan
- +"). Number of branches: "+tableScan.getNumChild());
- context.lastRootOfMultiChildOperator.push(tableScan);
- context.currentBranchCount.push(tableScan.getNumChild());
- context.lastWorkForMultiChildOperator.push(null);
- return null;
- }
- });
-
- opRules.put(new RuleRegExp("Handle Forward opertor",
- ForwardOperator.getOperatorName() + "%"), new NodeProcessor()
- {
- @Override
- public Object process(Node n, Stack<Node> s,
- NodeProcessorCtx procCtx, Object... os) throws SemanticException {
- GenTezProcContext context = (GenTezProcContext) procCtx;
- ForwardOperator forward = (ForwardOperator) n;
- LOG.debug("Forward operator ("+forward+
- "). Number of branches: "+forward.getNumChild());
- context.lastRootOfMultiChildOperator.push(context.currentRootOperator);
- context.currentBranchCount.push(forward.getNumChild());
- context.lastWorkForMultiChildOperator.push(null);
- return null;
- }
- });
-
// The dispatcher fires the processor corresponding to the closest matching
// rule and passes the context along
Dispatcher disp = new DefaultRuleDispatcher(null, opRules, procCtx);
List<Node> topNodes = new ArrayList<Node>();
topNodes.addAll(pCtx.getTopOps().values());
- GraphWalker ogw = new TezWalker(disp);
+ GraphWalker ogw = new GenTezWorkWalker(disp, procCtx);
ogw.startWalking(topNodes, null);
}