Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/UnionOptimizer.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/UnionOptimizer.java?rev=1733627&r1=1733626&r2=1733627&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/UnionOptimizer.java (original) +++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/UnionOptimizer.java Fri Mar 4 18:17:39 2016 @@ -19,13 +19,21 @@ package org.apache.pig.backend.hadoop.ex import java.util.ArrayList; import java.util.Arrays; +import java.util.Collections; +import java.util.HashSet; import java.util.LinkedList; import java.util.List; import java.util.Map.Entry; +import java.util.Set; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.pig.PigConfiguration; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator; +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POUserFunc; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSplit; +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.util.PlanHelper; import org.apache.pig.backend.hadoop.executionengine.tez.plan.TezEdgeDescriptor; import org.apache.pig.backend.hadoop.executionengine.tez.plan.TezOpPlanVisitor; @@ -35,10 +43,19 @@ import org.apache.pig.backend.hadoop.exe import org.apache.pig.backend.hadoop.executionengine.tez.plan.TezOperator.VertexGroupInfo; import org.apache.pig.backend.hadoop.executionengine.tez.plan.operator.POStoreTez; import org.apache.pig.backend.hadoop.executionengine.tez.plan.operator.POValueOutputTez; +import org.apache.pig.backend.hadoop.executionengine.tez.plan.udf.ReadScalarsTez; import org.apache.pig.backend.hadoop.executionengine.tez.runtime.TezInput; import org.apache.pig.backend.hadoop.executionengine.tez.runtime.TezOutput; +import org.apache.pig.backend.hadoop.executionengine.tez.util.TezCompilerUtil; +import org.apache.pig.backend.hadoop.hbase.HBaseStorage; +import org.apache.pig.builtin.AvroStorage; +import org.apache.pig.builtin.JsonStorage; +import org.apache.pig.builtin.OrcStorage; +import org.apache.pig.builtin.PigStorage; import org.apache.pig.builtin.RoundRobinPartitioner; +import org.apache.pig.builtin.mock.Storage; import org.apache.pig.impl.plan.OperatorKey; +import org.apache.pig.impl.plan.PlanException; import org.apache.pig.impl.plan.ReverseDependencyOrderWalker; import org.apache.pig.impl.plan.VisitorException; import org.apache.tez.dag.api.EdgeProperty.DataMovementType; @@ -63,8 +80,66 @@ import org.apache.tez.runtime.library.ou */ public class UnionOptimizer extends TezOpPlanVisitor { - public UnionOptimizer(TezOperPlan plan) { + private static final Log LOG = LogFactory.getLog(UnionOptimizer.class); + private TezOperPlan tezPlan; + private static Set<String> builtinSupportedStoreFuncs = new HashSet<String>(); + private List<String> supportedStoreFuncs; + private List<String> unsupportedStoreFuncs; + + static { + builtinSupportedStoreFuncs.add(PigStorage.class.getName()); + builtinSupportedStoreFuncs.add(JsonStorage.class.getName()); + builtinSupportedStoreFuncs.add(OrcStorage.class.getName()); + builtinSupportedStoreFuncs.add(HBaseStorage.class.getName()); + builtinSupportedStoreFuncs.add(AvroStorage.class.getName()); + builtinSupportedStoreFuncs.add("org.apache.pig.piggybank.storage.avro.AvroStorage"); + builtinSupportedStoreFuncs.add("org.apache.pig.piggybank.storage.avro.CSVExcelStorage"); + builtinSupportedStoreFuncs.add(Storage.class.getName()); + } + + public UnionOptimizer(TezOperPlan plan, List<String> supportedStoreFuncs, List<String> unsupportedStoreFuncs) { super(plan, new ReverseDependencyOrderWalker<TezOperator, TezOperPlan>(plan)); + tezPlan = plan; + this.supportedStoreFuncs = supportedStoreFuncs; + this.unsupportedStoreFuncs = unsupportedStoreFuncs; + } + + public static boolean isOptimizable(TezOperator tezOp, + List<String> supportedStoreFuncs, List<String> unsupportedStoreFuncs) + throws VisitorException { + if((tezOp.isLimit() || tezOp.isLimitAfterSort()) && tezOp.getRequestedParallelism() == 1) { + return false; + } + // Two vertices separately ranking with 1 to n and writing to output directly + // will make each rank repeate twice which is wrong. Rank always needs to be + // done from single vertex to have the counting correct. + if (tezOp.isRankCounter()) { + return false; + } + if (supportedStoreFuncs != null || unsupportedStoreFuncs != null) { + List<POStoreTez> stores = PlanHelper.getPhysicalOperators(tezOp.plan, POStoreTez.class); + for (POStoreTez store : stores) { + String name = store.getStoreFunc().getClass().getName(); + if (unsupportedStoreFuncs != null + && unsupportedStoreFuncs.contains(name)) { + return false; + } + if (supportedStoreFuncs != null + && !supportedStoreFuncs.contains(name)) { + if (!builtinSupportedStoreFuncs.contains(name)) { + LOG.warn(PigConfiguration.PIG_TEZ_OPT_UNION_SUPPORTED_STOREFUNCS + + " does not contain " + name + + " and so disabling union optimization. There will be some performance degradation. " + + "If your storefunc does not hardcode part file names and can work with multiple vertices writing to the output location," + + " run pig with -D" + + PigConfiguration.PIG_TEZ_OPT_UNION_SUPPORTED_STOREFUNCS + + "=<Comma separated list of fully qualified StoreFunc class names> to enable the optimization. Refer PIG-4691"); + return false; + } + } + } + } + return true; } @Override @@ -73,55 +148,104 @@ public class UnionOptimizer extends TezO return; } - if((tezOp.isLimit() || tezOp.isLimitAfterSort()) && tezOp.getRequestedParallelism() == 1) { + if (!isOptimizable(tezOp, supportedStoreFuncs, unsupportedStoreFuncs)) { return; } TezOperator unionOp = tezOp; - String unionOpKey = unionOp.getOperatorKey().toString(); String scope = unionOp.getOperatorKey().scope; - TezOperPlan tezPlan = getPlan(); + PhysicalPlan unionOpPlan = unionOp.plan; - //TODO: PIG-3856 Handle replicated join. Replicate join input that was broadcast to union vertex + // TODO: PIG-3856 Handle replicated join and skewed join sample. + // Replicate join small table/skewed join sample that was broadcast to union vertex // now needs to be broadcast to all the union predecessors. How do we do that?? // Wait for shared edge and do it or write multiple times?? - // For now don't optimize - // Create a copy as disconnect while iterating modifies the original list + // For now don't optimize except in the case of Split where we need to write only once + + Set<OperatorKey> uniqueUnionMembers = new HashSet<OperatorKey>(unionOp.getUnionMembers()); List<TezOperator> predecessors = new ArrayList<TezOperator>(tezPlan.getPredecessors(unionOp)); - if (predecessors.size() > unionOp.getVertexGroupMembers().size()) { - return; + List<TezOperator> successors = tezPlan.getSuccessors(unionOp) == null ? null + : new ArrayList<TezOperator>(tezPlan.getSuccessors(unionOp)); + + + if (successors != null && uniqueUnionMembers.size() > 1) { + for (TezOperator succ : successors) { + for (TezOperator pred : predecessors) { + if (succ.inEdges.containsKey(pred.getOperatorKey())) { + // Stop here, we cannot convert the node into vertex group + // Otherwise, we will end up with a parallel edge between pred + // and succ + return; + } + } + } } + if (predecessors.size() > unionOp.getUnionMembers().size() + && uniqueUnionMembers.size() != 1) { + return; // TODO: PIG-3856 + } + if (uniqueUnionMembers.size() == 1) { + // We actually don't need VertexGroup in this case. The multiple + // sub-plans of Split can write to same MROutput or the Tez LogicalOutput + OperatorKey splitPredKey = uniqueUnionMembers.iterator().next(); + TezOperator splitPredOp = tezPlan.getOperator(splitPredKey); + PhysicalPlan splitPredPlan = splitPredOp.plan; + if (splitPredPlan.getLeaves().get(0) instanceof POSplit) { //It has to be. But check anyways + + try { + connectUnionNonMemberPredecessorsToSplit(unionOp, splitPredOp, predecessors); + + // Remove POShuffledValueInputTez from union plan root + unionOpPlan.remove(unionOpPlan.getRoots().get(0)); + // Clone union plan into split subplans + for (int i=0; i < Collections.frequency(unionOp.getUnionMembers(), splitPredKey); i++ ) { + cloneAndMergeUnionPlan(unionOp, splitPredOp); + } + copyOperatorProperties(splitPredOp, unionOp); + tezPlan.disconnect(splitPredOp, unionOp); - PhysicalPlan unionOpPlan = unionOp.plan; + connectSplitOpToUnionSuccessors(unionOp, splitPredOp, successors); + } catch (PlanException e) { + throw new VisitorException(e); + } - // Union followed by Split followed by Store could have multiple stores + //Remove union operator from the plan + tezPlan.remove(unionOp); + return; + } else { + throw new VisitorException("Expected POSplit but found " + splitPredPlan.getLeaves().get(0)); + } + } + + // Create vertex group operator for each store. Union followed by Split + // followed by Store could have multiple stores List<POStoreTez> unionStoreOutputs = PlanHelper.getPhysicalOperators(unionOpPlan, POStoreTez.class); TezOperator[] storeVertexGroupOps = new TezOperator[unionStoreOutputs.size()]; - List<TezOperator> succs = tezPlan.getSuccessors(unionOp); - // Create a copy as disconnect while iterating modifies the original list - List<TezOperator> successors = succs == null ? null : new ArrayList<TezOperator>(succs); - for (int i=0; i < storeVertexGroupOps.length; i++) { TezOperator existingVertexGroup = null; if (successors != null) { for (TezOperator succ : successors) { - if (succ.isVertexGroup() && succ.getVertexGroupInfo().getSFile().equals(unionStoreOutputs.get(i).getSFile())) { + if (succ.isVertexGroup() && unionStoreOutputs.get(i).getSFile().equals(succ.getVertexGroupInfo().getSFile())) { existingVertexGroup = succ; } } } if (existingVertexGroup != null) { storeVertexGroupOps[i] = existingVertexGroup; + existingVertexGroup.getVertexGroupMembers().remove(unionOp.getOperatorKey()); + existingVertexGroup.getVertexGroupMembers().addAll(unionOp.getUnionMembers()); + existingVertexGroup.getVertexGroupInfo().removeInput(unionOp.getOperatorKey()); } else { storeVertexGroupOps[i] = new TezOperator(OperatorKey.genOpKey(scope)); storeVertexGroupOps[i].setVertexGroupInfo(new VertexGroupInfo(unionStoreOutputs.get(i))); storeVertexGroupOps[i].getVertexGroupInfo().setSFile(unionStoreOutputs.get(i).getSFile()); - storeVertexGroupOps[i].setVertexGroupMembers(unionOp.getVertexGroupMembers()); + storeVertexGroupOps[i].setVertexGroupMembers(new ArrayList<OperatorKey>(unionOp.getUnionMembers())); tezPlan.add(storeVertexGroupOps[i]); } } - // Case of split, orderby, skewed join, rank, etc will have multiple outputs + // Create vertex group operator for each output. Case of split, orderby, + // skewed join, rank, etc will have multiple outputs List<TezOutput> unionOutputs = PlanHelper.getPhysicalOperators(unionOpPlan, TezOutput.class); // One TezOutput can write to multiple LogicalOutputs (POCounterTez, POValueOutputTez, etc) List<String> unionOutputKeys = new ArrayList<String>(); @@ -133,133 +257,344 @@ public class UnionOptimizer extends TezO unionOutputKeys.add(key); } } - - // Create vertex group operator for each output TezOperator[] outputVertexGroupOps = new TezOperator[unionOutputKeys.size()]; String[] newOutputKeys = new String[unionOutputKeys.size()]; for (int i=0; i < outputVertexGroupOps.length; i++) { outputVertexGroupOps[i] = new TezOperator(OperatorKey.genOpKey(scope)); outputVertexGroupOps[i].setVertexGroupInfo(new VertexGroupInfo()); outputVertexGroupOps[i].getVertexGroupInfo().setOutput(unionOutputKeys.get(i)); - outputVertexGroupOps[i].setVertexGroupMembers(unionOp.getVertexGroupMembers()); + outputVertexGroupOps[i].setVertexGroupMembers(new ArrayList<OperatorKey>(unionOp.getUnionMembers())); newOutputKeys[i] = outputVertexGroupOps[i].getOperatorKey().toString(); tezPlan.add(outputVertexGroupOps[i]); } + // Change plan from Predecessors -> Union -> Successor(s) to + // Predecessors -> Vertex Group(s) -> Successor(s) try { - - // Clone plan of union and merge it into the predecessor operators // Remove POShuffledValueInputTez from union plan root unionOpPlan.remove(unionOpPlan.getRoots().get(0)); - for (OperatorKey predKey : unionOp.getVertexGroupMembers()) { + + for (OperatorKey predKey : unionOp.getUnionMembers()) { TezOperator pred = tezPlan.getOperator(predKey); - PhysicalPlan predPlan = pred.plan; - PhysicalOperator predLeaf = predPlan.getLeaves().get(0); - // if predLeaf not POValueOutputTez - if (predLeaf instanceof POSplit) { - // Find the subPlan that connects to the union operator - predPlan = getUnionPredPlanFromSplit(predPlan, unionOpKey); - predLeaf = predPlan.getLeaves().get(0); - } - - PhysicalPlan clonePlan = unionOpPlan.clone(); - //Clone changes the operator keys - List<POStoreTez> clonedUnionStoreOutputs = PlanHelper.getPhysicalOperators(clonePlan, POStoreTez.class); - - // Remove POValueOutputTez from predecessor leaf - predPlan.remove(predLeaf); - boolean isEmptyPlan = predPlan.isEmpty(); - if (!isEmptyPlan) { - predLeaf = predPlan.getLeaves().get(0); - } - predPlan.merge(clonePlan); - if (!isEmptyPlan) { - predPlan.connect(predLeaf, clonePlan.getRoots().get(0)); - } - - // Connect predecessor to the storeVertexGroups - int i = 0; - for (TezOperator storeVertexGroup : storeVertexGroupOps) { - storeVertexGroup.getVertexGroupInfo().addInput(pred.getOperatorKey()); - //Set the output key of cloned POStore to that of the initial union POStore. - clonedUnionStoreOutputs.get(i).setOutputKey( - storeVertexGroup.getVertexGroupInfo().getStore() - .getOperatorKey().toString()); - pred.addVertexGroupStore(clonedUnionStoreOutputs.get(i++).getOperatorKey(), - storeVertexGroup.getOperatorKey()); - tezPlan.connect(pred, storeVertexGroup); - } - - for (TezOperator outputVertexGroup : outputVertexGroupOps) { - outputVertexGroup.getVertexGroupInfo().addInput(pred.getOperatorKey()); - tezPlan.connect(pred, outputVertexGroup); - } - - copyOperatorProperties(pred, unionOp); - tezPlan.disconnect(pred, unionOp); - } - - List<TezOutput> valueOnlyOutputs = new ArrayList<TezOutput>(); - for (TezOutput tezOutput : unionOutputs) { - if (tezOutput instanceof POValueOutputTez) { - valueOnlyOutputs.add(tezOutput); - } - } - // Connect to outputVertexGroupOps - // Copy output edges of union -> successor to predecessor->successor, vertexgroup -> successor - // and connect vertexgroup -> successor in the plan. - for (Entry<OperatorKey, TezEdgeDescriptor> entry : unionOp.outEdges.entrySet()) { - TezOperator succOp = tezPlan.getOperator(entry.getKey()); - // Case of union followed by union. - // unionOp.outEdges will not point to vertex group, but to its output. - // So find the vertex group if there is one. - TezOperator succOpVertexGroup = null; - for (TezOperator succ : successors) { - if (succ.isVertexGroup() - && succ.getVertexGroupInfo().getOutput() - .equals(succOp.getOperatorKey().toString())) { - succOpVertexGroup = succ; - break; - } - } - TezEdgeDescriptor edge = entry.getValue(); - // Edge cannot be one to one as it will get input from two or - // more union predecessors. Change it to SCATTER_GATHER - if (edge.dataMovementType == DataMovementType.ONE_TO_ONE) { - edge.dataMovementType = DataMovementType.SCATTER_GATHER; - edge.partitionerClass = RoundRobinPartitioner.class; - edge.outputClassName = UnorderedPartitionedKVOutput.class.getName(); - edge.inputClassName = UnorderedKVInput.class.getName(); - } - TezOperator vertexGroupOp = outputVertexGroupOps[unionOutputKeys.indexOf(entry.getKey().toString())]; - for (OperatorKey predKey : vertexGroupOp.getVertexGroupMembers()) { - TezOperator pred = tezPlan.getOperator(predKey); - // Keep the output edge directly to successor - // Don't need to keep output edge for vertexgroup - pred.outEdges.put(entry.getKey(), edge); - succOp.inEdges.put(predKey, edge); - if (succOpVertexGroup != null) { - succOpVertexGroup.getVertexGroupMembers().add(predKey); - succOpVertexGroup.getVertexGroupInfo().addInput(predKey); - // Connect directly to the successor vertex group - tezPlan.disconnect(pred, vertexGroupOp); - tezPlan.connect(pred, succOpVertexGroup); + PhysicalPlan clonePlan = cloneAndMergeUnionPlan(unionOp, pred); + connectPredecessorsToVertexGroups(unionOp, pred, clonePlan, + storeVertexGroupOps, outputVertexGroupOps); + } + + connectVertexGroupsToSuccessors(unionOp, successors, + unionOutputKeys, outputVertexGroupOps); + + replaceSuccessorInputsAndDisconnect(unionOp, successors, unionOutputKeys, newOutputKeys); + + //Remove union operator from the plan + tezPlan.remove(unionOp); + } catch (VisitorException e) { + throw e; + } catch (Exception e) { + throw new VisitorException(e); + } + + } + + /** + * Connect the predecessors of the union which are not members of the union + * (usually FRJoin replicated table orSkewedJoin sample) to the Split op + * which is the only member of the union. Disconnect those predecessors from the union. + * + * Replace the output keys of those predecessors with the split operator + * key instead of the union operator key. + * + * @param unionOp Union operator + * @param splitPredOp Split operator which is the only member of the union and its predecessor + * @param unionPredecessors Predecessors of the union including the split operator + * @throws PlanException + * @throws VisitorException + */ + private void connectUnionNonMemberPredecessorsToSplit(TezOperator unionOp, + TezOperator splitPredOp, + List<TezOperator> unionPredecessors) throws PlanException, VisitorException { + String unionOpKey = unionOp.getOperatorKey().toString(); + OperatorKey splitPredKey = splitPredOp.getOperatorKey(); + for (TezOperator pred : unionPredecessors) { + + if (!pred.getOperatorKey().equals(splitPredKey)) { //Skip splitPredOp which is also a predecessor + // Get actual predecessors if predecessor is a vertex group + TezOperator predVertexGroup = null; + List<TezOperator> actualPreds = new ArrayList<TezOperator>(); + if (pred.isVertexGroup()) { + predVertexGroup = pred; + for (OperatorKey opKey : pred.getVertexGroupMembers()) { + // There should not be multiple levels of vertex group. So no recursion required. + actualPreds.add(tezPlan.getOperator(opKey)); } + tezPlan.disconnect(predVertexGroup, unionOp); + tezPlan.connect(predVertexGroup, splitPredOp); + } else { + actualPreds.add(pred); } - if (succOpVertexGroup != null) { - succOpVertexGroup.getVertexGroupMembers().remove(unionOp.getOperatorKey()); - succOpVertexGroup.getVertexGroupInfo().removeInput(unionOp.getOperatorKey()); - //Discard the new vertex group created - tezPlan.remove(vertexGroupOp); + + for (TezOperator actualPred : actualPreds) { + + TezCompilerUtil.replaceOutput(actualPred, unionOpKey, splitPredKey.toString()); + + TezEdgeDescriptor edge = actualPred.outEdges.remove(unionOp.getOperatorKey()); + if (edge == null) { + throw new VisitorException("Edge description is empty"); + } + actualPred.outEdges.put(splitPredKey, edge); + splitPredOp.inEdges.put(actualPred.getOperatorKey(), edge); + if (predVertexGroup == null) { + // Disconnect FRJoin table/SkewedJoin sample edge to + // union op and connect to POSplit + tezPlan.disconnect(actualPred, unionOp); + tezPlan.connect(actualPred, splitPredOp); + } + } + } + } + } + + /** + * Connect the split operator to the successors of the union operators and update the edges. + * Also change the inputs of the successor from the union operator to the split operator. + * + * @param unionOp Union operator + * @param splitPredOp Split operator which is the only member of the union + * @param successors Successors of the union operator + * @throws PlanException + * @throws VisitorException + */ + private void connectSplitOpToUnionSuccessors(TezOperator unionOp, + TezOperator splitPredOp, List<TezOperator> successors) + throws PlanException, VisitorException { + String unionOpKey = unionOp.getOperatorKey().toString(); + String splitPredOpKey = splitPredOp.getOperatorKey().toString(); + List<TezOperator> splitSuccessors = tezPlan.getSuccessors(splitPredOp); + if (successors != null) { + for (TezOperator succ : successors) { + TezOperator successorVertexGroup = null; + boolean removeSuccessorVertexGroup = false; + List<TezOperator> actualSuccs = new ArrayList<TezOperator>(); + if (succ.isVertexGroup()) { + successorVertexGroup = succ; + if (tezPlan.getSuccessors(successorVertexGroup) != null) { + // There should not be multiple levels of vertex group. So no recursion required. + actualSuccs.addAll(tezPlan.getSuccessors(successorVertexGroup)); + } + int index = succ.getVertexGroupMembers().indexOf(unionOp.getOperatorKey()); + while (index > -1) { + succ.getVertexGroupMembers().set(index, splitPredOp.getOperatorKey()); + index = succ.getVertexGroupMembers().indexOf(unionOp.getOperatorKey()); + } + // Store vertex group + POStore store = successorVertexGroup.getVertexGroupInfo().getStore(); + if (store != null) { + //Clone changes the operator keys + List<POStoreTez> storeOutputs = PlanHelper.getPhysicalOperators(splitPredOp.plan, POStoreTez.class); + for (POStoreTez storeOut : storeOutputs) { + if (storeOut.getOutputKey().equals(store.getOperatorKey().toString())) { + splitPredOp.addVertexGroupStore(storeOut.getOperatorKey(), successorVertexGroup.getOperatorKey()); + } + } + } + tezPlan.disconnect(unionOp, successorVertexGroup); + Set<OperatorKey> uniqueVertexGroupMembers = new HashSet<OperatorKey>(succ.getVertexGroupMembers()); + if (uniqueVertexGroupMembers.size() == 1) { + //Only splitPredOp is member of the vertex group. Get rid of the vertex group + removeSuccessorVertexGroup = true; + } else { + // Avoid connecting multiple times in case of union + self join + if (splitSuccessors == null || !splitSuccessors.contains(successorVertexGroup)) { + tezPlan.connect(splitPredOp, successorVertexGroup); + } + } } else { - tezPlan.connect(vertexGroupOp, succOp); + actualSuccs.add(succ); + } + + // Store vertex group + if (actualSuccs.isEmpty() && removeSuccessorVertexGroup) { + splitPredOp.removeVertexGroupStore(successorVertexGroup.getOperatorKey()); + tezPlan.remove(successorVertexGroup); } + + for (TezOperator actualSucc : actualSuccs) { + + TezCompilerUtil.replaceInput(actualSucc, unionOpKey, splitPredOpKey); + + TezEdgeDescriptor edge = actualSucc.inEdges.remove(unionOp.getOperatorKey()); + if (edge == null) { + throw new VisitorException("Edge description is empty"); + } + actualSucc.inEdges.put(splitPredOp.getOperatorKey(), edge); + splitPredOp.outEdges.put(actualSucc.getOperatorKey(), edge); + if (successorVertexGroup == null || removeSuccessorVertexGroup) { + if (removeSuccessorVertexGroup) { + // Changes plan from SplitOp -> Union -> VertexGroup - > Successor + // to SplitOp -> Successor + tezPlan.disconnect(successorVertexGroup, actualSucc); + tezPlan.remove(successorVertexGroup); + TezCompilerUtil.replaceInput(actualSucc, successorVertexGroup.getOperatorKey().toString(), splitPredOpKey); + } else { + // Changes plan from SplitOp -> Union -> Successor + // to SplitOp -> Successor + tezPlan.disconnect(unionOp, actualSucc); + } + // Avoid connecting multiple times in case of union + self join + if (splitSuccessors == null || !splitSuccessors.contains(actualSucc)) { + tezPlan.connect(splitPredOp, actualSucc); + } + } + } + } + } + } + + /** + * Clone plan of union and merge it into the predecessor operator + * + * @param unionOp Union operator + * @param predOp Predecessor operator of union to which union plan should be merged to + */ + private PhysicalPlan cloneAndMergeUnionPlan(TezOperator unionOp, TezOperator predOp) throws VisitorException { + try { + PhysicalPlan predPlan = predOp.plan; + PhysicalOperator predLeaf = predPlan.getLeaves().get(0); + // if predLeaf not POValueOutputTez + if (predLeaf instanceof POSplit) { + // Find the subPlan that connects to the union operator + predPlan = getUnionPredPlanFromSplit(predPlan, unionOp.getOperatorKey().toString()); + predLeaf = predPlan.getLeaves().get(0); + } + PhysicalPlan clonePlan = unionOp.plan.clone(); + + // Remove POValueOutputTez from predecessor leaf + predPlan.remove(predLeaf); + boolean isEmptyPlan = predPlan.isEmpty(); + if (!isEmptyPlan) { + predLeaf = predPlan.getLeaves().get(0); } + predPlan.merge(clonePlan); + if (!isEmptyPlan) { + predPlan.connect(predLeaf, clonePlan.getRoots().get(0)); + } + return clonePlan; } catch (Exception e) { throw new VisitorException(e); } + } + /** + * Connects the unionOp predecessor to the store vertex groups and the output vertex groups + * and disconnects it from the unionOp. + * + * @param pred Predecessor of union which will be made part of the vertex group + * @param unionOp Union operator + * @param predClonedUnionPlan Cloned plan of the union merged to the predecessor + * @param storeVertexGroupOps Store vertex groups to connect to + * @param outputVertexGroupOps Tez LogicalOutput vertex groups to connect to + */ + public void connectPredecessorsToVertexGroups(TezOperator unionOp, + TezOperator pred, PhysicalPlan predClonedUnionPlan, + TezOperator[] storeVertexGroupOps, + TezOperator[] outputVertexGroupOps) throws VisitorException,PlanException { + + //Clone changes the operator keys + List<POStoreTez> clonedUnionStoreOutputs = PlanHelper.getPhysicalOperators(predClonedUnionPlan, POStoreTez.class); + + // Connect predecessor to the storeVertexGroups + int i = 0; + for (TezOperator storeVertexGroup : storeVertexGroupOps) { + storeVertexGroup.getVertexGroupInfo().addInput(pred.getOperatorKey()); + pred.addVertexGroupStore(clonedUnionStoreOutputs.get(i++).getOperatorKey(), + storeVertexGroup.getOperatorKey()); + tezPlan.connect(pred, storeVertexGroup); + } + + for (TezOperator outputVertexGroup : outputVertexGroupOps) { + outputVertexGroup.getVertexGroupInfo().addInput(pred.getOperatorKey()); + tezPlan.connect(pred, outputVertexGroup); + } + + copyOperatorProperties(pred, unionOp); + tezPlan.disconnect(pred, unionOp); + } + + /** + * Connect vertexgroup operator to successor operator in the plan. + * + * Copy the output edge between union operator and successor to between + * predecessors and successor. Predecessor output key and output edge points + * to successor so that we have all the edge configuration, but they are + * connected to the vertex group in the plan. + * + * @param unionOp Union operator + * @param successors Successors of the union operator + * @param unionOutputKeys Output keys of union + * @param outputVertexGroupOp Tez LogicalOutput vertex groups corresponding to the output keys + * + * @throws PlanException + */ + private void connectVertexGroupsToSuccessors(TezOperator unionOp, + List<TezOperator> successors, List<String> unionOutputKeys, + TezOperator[] outputVertexGroupOps) throws PlanException { + // Connect to outputVertexGroupOps + for (Entry<OperatorKey, TezEdgeDescriptor> entry : unionOp.outEdges.entrySet()) { + TezOperator succOp = tezPlan.getOperator(entry.getKey()); + // Case of union followed by union. + // unionOp.outEdges will not point to vertex group, but to its output. + // So find the vertex group if there is one. + TezOperator succOpVertexGroup = null; + for (TezOperator succ : successors) { + if (succ.isVertexGroup() + && succOp.getOperatorKey().toString() + .equals(succ.getVertexGroupInfo().getOutput())) { + succOpVertexGroup = succ; + break; + } + } + TezEdgeDescriptor edge = entry.getValue(); + // Edge cannot be one to one as it will get input from two or + // more union predecessors. Change it to SCATTER_GATHER + if (edge.dataMovementType == DataMovementType.ONE_TO_ONE) { + edge.dataMovementType = DataMovementType.SCATTER_GATHER; + edge.partitionerClass = RoundRobinPartitioner.class; + edge.outputClassName = UnorderedPartitionedKVOutput.class.getName(); + edge.inputClassName = UnorderedKVInput.class.getName(); + } + TezOperator vertexGroupOp = outputVertexGroupOps[unionOutputKeys.indexOf(entry.getKey().toString())]; + for (OperatorKey predKey : vertexGroupOp.getVertexGroupMembers()) { + TezOperator pred = tezPlan.getOperator(predKey); + // Keep the output edge directly to successor + // Don't need to keep output edge for vertexgroup + pred.outEdges.put(entry.getKey(), edge); + succOp.inEdges.put(predKey, edge); + if (succOpVertexGroup != null) { + succOpVertexGroup.getVertexGroupMembers().add(predKey); + succOpVertexGroup.getVertexGroupInfo().addInput(predKey); + // Connect directly to the successor vertex group + tezPlan.disconnect(pred, vertexGroupOp); + tezPlan.connect(pred, succOpVertexGroup); + } + } + if (succOpVertexGroup != null) { + succOpVertexGroup.getVertexGroupMembers().remove(unionOp.getOperatorKey()); + succOpVertexGroup.getVertexGroupInfo().removeInput(unionOp.getOperatorKey()); + //Discard the new vertex group created + tezPlan.remove(vertexGroupOp); + } else { + tezPlan.connect(vertexGroupOp, succOp); + } + } + } + + private void replaceSuccessorInputsAndDisconnect(TezOperator unionOp, + List<TezOperator> successors, + List<String> unionOutputKeys, + String[] newOutputKeys) + throws VisitorException { if (successors != null) { + String unionOpKey = unionOp.getOperatorKey().toString(); // Successor inputs should now point to the vertex groups. for (TezOperator succ : successors) { LinkedList<TezInput> inputs = PlanHelper.getPhysicalOperators(succ.plan, TezInput.class); @@ -271,16 +606,27 @@ public class UnionOptimizer extends TezO } } } + + List<POUserFunc> userFuncs = PlanHelper.getPhysicalOperators(succ.plan, POUserFunc.class); + for (POUserFunc userFunc : userFuncs) { + if (userFunc.getFunc() instanceof ReadScalarsTez) { + TezInput tezInput = (TezInput)userFunc.getFunc(); + for (String inputKey : tezInput.getTezInputs()) { + if (inputKey.equals(unionOpKey)) { + tezInput.replaceInput(inputKey, + newOutputKeys[unionOutputKeys.indexOf(succ.getOperatorKey().toString())]); + userFunc.getFuncSpec().setCtorArgs(tezInput.getTezInputs()); + } + } + } + } + tezPlan.disconnect(unionOp, succ); } } - - //Remove union operator from the plan - tezPlan.remove(unionOp); - } - private void copyOperatorProperties(TezOperator pred, TezOperator unionOp) { + private void copyOperatorProperties(TezOperator pred, TezOperator unionOp) throws VisitorException { pred.UDFs.addAll(unionOp.UDFs); pred.scalars.addAll(unionOp.scalars); // Copy only map side properties. For eg: crossKeys. @@ -292,6 +638,17 @@ public class UnionOptimizer extends TezO } } pred.copyFeatures(unionOp, Arrays.asList(new OPER_FEATURE[]{OPER_FEATURE.UNION})); + + // For skewed join right input + if (unionOp.getSampleOperator() != null) { + if (pred.getSampleOperator() == null) { + pred.setSampleOperator(unionOp.getSampleOperator()); + } else if (!pred.getSampleOperator().equals(unionOp.getSampleOperator())) { + throw new VisitorException("Conflicting sample operators " + + pred.getSampleOperator().toString() + " and " + + unionOp.getSampleOperator().toString()); + } + } } public static PhysicalPlan getUnionPredPlanFromSplit(PhysicalPlan plan, String unionOpKey) throws VisitorException {
Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/udf/ReadScalarsTez.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/udf/ReadScalarsTez.java?rev=1733627&r1=1733626&r2=1733627&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/udf/ReadScalarsTez.java (original) +++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/udf/ReadScalarsTez.java Fri Mar 4 18:17:39 2016 @@ -67,9 +67,10 @@ public class ReadScalarsTez extends Eval public void attachInputs(Map<String, LogicalInput> inputs, Configuration conf) throws ExecException { String cacheKey = "scalar-" + inputKey; - Object cacheValue = ObjectCache.getInstance().retrieve(cacheKey); - if (cacheValue != null) { - t = (Tuple) cacheValue; + String cacheKeyPresent = "scalar-present" + inputKey; + + if (ObjectCache.getInstance().retrieve(cacheKeyPresent) != null) { + t = (Tuple)ObjectCache.getInstance().retrieve(cacheKey); return; } input = inputs.get(inputKey); @@ -84,7 +85,8 @@ public class ReadScalarsTez extends Eval if (reader.next()) { String msg = "Scalar has more than one row in the output. " + "1st : " + first + ", 2nd :" - + reader.getCurrentValue(); + + reader.getCurrentValue() + + " (common cause: \"JOIN\" then \"FOREACH ... GENERATE foo.bar\" should be \"foo::bar\" )"; throw new ExecException(msg); } } else { @@ -94,12 +96,16 @@ public class ReadScalarsTez extends Eval } catch (Exception e) { throw new ExecException(e); } + ObjectCache.getInstance().cache(cacheKeyPresent, Boolean.TRUE); ObjectCache.getInstance().cache(cacheKey, t); log.info("Cached scalar in Tez ObjectRegistry with vertex scope. cachekey=" + cacheKey); } @Override public Object exec(Tuple input) throws IOException { + if (t == null) { + return null; + } int pos = (Integer) input.get(0); Object obj = t.get(pos); return obj; Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/runtime/PartitionerDefinedVertexManager.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/runtime/PartitionerDefinedVertexManager.java?rev=1733627&r1=1733626&r2=1733627&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/runtime/PartitionerDefinedVertexManager.java (original) +++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/runtime/PartitionerDefinedVertexManager.java Fri Mar 4 18:17:39 2016 @@ -24,6 +24,8 @@ import java.util.Map; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.tez.dag.api.EdgeManagerPluginDescriptor; +import org.apache.tez.dag.api.EdgeProperty; +import org.apache.tez.dag.api.EdgeProperty.DataMovementType; import org.apache.tez.dag.api.InputDescriptor; import org.apache.tez.dag.api.VertexManagerPlugin; import org.apache.tez.dag.api.VertexManagerPluginContext; @@ -68,7 +70,7 @@ public class PartitionerDefinedVertexMan } @Override - public void onVertexManagerEventReceived(VertexManagerEvent vmEvent) { + public void onVertexManagerEventReceived(VertexManagerEvent vmEvent) throws Exception { // There could be multiple partition vertex sending VertexManagerEvent // Only need to setVertexParallelism once if (isParallelismSet) { @@ -86,14 +88,14 @@ public class PartitionerDefinedVertexMan if (dynamicParallelism!=currentParallelism) { LOG.info("Pig Partitioner Defined Vertex Manager: reset parallelism to " + dynamicParallelism + " from " + currentParallelism); - Map<String, EdgeManagerPluginDescriptor> edgeManagers = - new HashMap<String, EdgeManagerPluginDescriptor>(); - for(String vertex : getContext().getInputVertexEdgeProperties().keySet()) { - EdgeManagerPluginDescriptor edgeManagerDescriptor = - EdgeManagerPluginDescriptor.create(ScatterGatherEdgeManager.class.getName()); - edgeManagers.put(vertex, edgeManagerDescriptor); + Map<String, EdgeProperty> edgeManagers = new HashMap<String, EdgeProperty>(); + for(Map.Entry<String,EdgeProperty> entry : getContext().getInputVertexEdgeProperties().entrySet()) { + EdgeProperty edge = entry.getValue(); + edge = EdgeProperty.create(DataMovementType.SCATTER_GATHER, edge.getDataSourceType(), edge.getSchedulingType(), + edge.getEdgeSource(), edge.getEdgeDestination()); + edgeManagers.put(entry.getKey(), edge); } - getContext().setVertexParallelism(dynamicParallelism, null, edgeManagers, null); + getContext().reconfigureVertex(dynamicParallelism, null, edgeManagers); } } } Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/runtime/PigOutputFormatTez.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/runtime/PigOutputFormatTez.java?rev=1733627&r1=1733626&r2=1733627&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/runtime/PigOutputFormatTez.java (original) +++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/runtime/PigOutputFormatTez.java Fri Mar 4 18:17:39 2016 @@ -20,21 +20,34 @@ package org.apache.pig.backend.hadoop.ex import java.io.IOException; import java.util.List; +import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.mapreduce.JobContext; import org.apache.hadoop.mapreduce.JobStatus.State; import org.apache.hadoop.mapreduce.OutputCommitter; +import org.apache.hadoop.mapreduce.RecordWriter; import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigOutputCommitter; import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigOutputFormat; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore; +import org.apache.pig.data.Tuple; import org.apache.pig.impl.util.UDFContext; public class PigOutputFormatTez extends PigOutputFormat { + + @Override + public RecordWriter<WritableComparable, Tuple> getRecordWriter( + TaskAttemptContext taskattemptcontext) throws IOException, + InterruptedException { + resetUDFContextForThreadReuse(); + return super.getRecordWriter(taskattemptcontext); + } + @Override public OutputCommitter getOutputCommitter( TaskAttemptContext taskattemptcontext) throws IOException, InterruptedException { + resetUDFContextForThreadReuse(); setupUdfEnvAndStores(taskattemptcontext); // we return an instance of PigOutputCommitterTez (PIG-4202) to Hadoop - this instance @@ -44,6 +57,21 @@ public class PigOutputFormatTez extends reduceStores); } + public static void resetUDFContextForThreadReuse() { + // On the Tez AM, MROutput OutputCommitters are initialized and setupJob + // called on them in a loop in the same thread. + // commitJob/abortJob can be called from any thread based on events received from vertices + + // On the Tez tasks, it initializes different inputs/outputs in different Initializer threads + // by submitting them to a thread pool. Even though threadpoolsize=numInputs+numOutputs + // a thread can be reused. + + // Since deserialized UDFContext from input and output payload contains + // information only for that input or output reduce payload sizes, we need to + // ensure it is deserialized everytime before use in a thread to get the right one. + UDFContext.getUDFContext().reset(); + } + public static class PigOutputCommitterTez extends PigOutputCommitter { public PigOutputCommitterTez(TaskAttemptContext context, @@ -54,39 +82,35 @@ public class PigOutputFormatTez extends @Override public void setupJob(JobContext context) throws IOException { - cleanupForContainerReuse(); + resetUDFContextForThreadReuse(); try { super.setupJob(context); } finally { - cleanupForContainerReuse(); + resetUDFContextForThreadReuse(); } } @Override public void commitJob(JobContext context) throws IOException { - cleanupForContainerReuse(); + resetUDFContextForThreadReuse(); try { super.commitJob(context); } finally { - cleanupForContainerReuse(); + resetUDFContextForThreadReuse(); } } @Override public void abortJob(JobContext context, State state) throws IOException { - cleanupForContainerReuse(); + resetUDFContextForThreadReuse(); try { super.abortJob(context, state); } finally { - cleanupForContainerReuse(); + resetUDFContextForThreadReuse(); } } - private void cleanupForContainerReuse() { - UDFContext.getUDFContext().reset(); - } - } } Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/runtime/PigProcessor.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/runtime/PigProcessor.java?rev=1733627&r1=1733626&r2=1733627&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/runtime/PigProcessor.java (original) +++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/runtime/PigProcessor.java Fri Mar 4 18:17:39 2016 @@ -30,13 +30,17 @@ import java.util.Set; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.mapred.TaskAttemptID; +import org.apache.hadoop.mapreduce.JobContext; import org.apache.pig.JVMReuseImpl; import org.apache.pig.PigConstants; import org.apache.pig.PigException; import org.apache.pig.backend.executionengine.ExecException; +import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.JobControlCompiler; import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MRConfiguration; import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigHadoopLogger; import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapReduce; +import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.ProgressableReporter; import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.UDFFinishVisitor; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator; @@ -57,6 +61,7 @@ import org.apache.pig.impl.util.Utils; import org.apache.pig.tools.pigstats.PigStatusReporter; import org.apache.tez.common.TezUtils; import org.apache.tez.dag.api.UserPayload; +import org.apache.tez.mapreduce.hadoop.MRConfig; import org.apache.tez.mapreduce.output.MROutput; import org.apache.tez.runtime.api.AbstractLogicalIOProcessor; import org.apache.tez.runtime.api.Event; @@ -116,6 +121,11 @@ public class PigProcessor extends Abstra // Reset static variables cleared for avoiding OOM. new JVMReuseImpl().cleanupStaticData(); + // Set an empty reporter for now. Once we go to Tez 0.8 + // which adds support for mapreduce like progress (TEZ-808), + // we need to call progress on Tez API + PhysicalOperator.setReporter(new ProgressableReporter()); + UserPayload payload = getContext().getUserPayload(); conf = TezUtils.createConfFromUserPayload(payload); PigContext.setPackageImportList((ArrayList<String>) ObjectSerializer @@ -124,6 +134,22 @@ public class PigProcessor extends Abstra // To determine front-end in UDFContext conf.set(MRConfiguration.JOB_APPLICATION_ATTEMPT_ID, getContext().getUniqueIdentifier()); + + // For compatibility with mapreduce. Some users use these configs in their UDF + // Copied logic from the tez class - org.apache.tez.mapreduce.output.MROutput + // Currently isMapperOutput is always false. Setting it to true produces empty output with MROutput + boolean isMapperOutput = conf.getBoolean(MRConfig.IS_MAP_PROCESSOR, false); + TaskAttemptID taskAttemptId = org.apache.tez.mapreduce.hadoop.mapreduce.TaskAttemptContextImpl + .createMockTaskAttemptID(getContext().getApplicationId().getClusterTimestamp(), + getContext().getTaskVertexIndex(), getContext().getApplicationId().getId(), + getContext().getTaskIndex(), getContext().getTaskAttemptNumber(), isMapperOutput); + conf.set(JobContext.TASK_ATTEMPT_ID, taskAttemptId.toString()); + conf.set(JobContext.TASK_ID, taskAttemptId.getTaskID().toString()); + conf.setBoolean(JobContext.TASK_ISMAP, isMapperOutput); + conf.setInt(JobContext.TASK_PARTITION, + taskAttemptId.getTaskID().getId()); + conf.set(JobContext.ID, taskAttemptId.getJobID().toString()); + conf.set(PigConstants.TASK_INDEX, Integer.toString(getContext().getTaskIndex())); UDFContext.getUDFContext().addJobConf(conf); UDFContext.getUDFContext().deserialize(); @@ -161,7 +187,6 @@ public class PigProcessor extends Abstra @Override public void close() throws Exception { - execPlan = null; fileOutputs = null; leaf = null; @@ -173,6 +198,8 @@ public class PigProcessor extends Abstra // The Reporter and Context objects hold TezProcessorContextImpl // which holds input and its sort buffers which are huge. new JVMReuseImpl().cleanupStaticData(); + // Do only in close() and not initialize(). + UDFContext.staticDataCleanup(); } @Override @@ -193,8 +220,22 @@ public class PigProcessor extends Abstra leaf = leaves.get(0); } + LOG.info("Aliases being processed per job phase (AliasName[line,offset]): " + conf.get("pig.alias.location")); + runPipeline(leaf); + if (Boolean.valueOf(conf.get(JobControlCompiler.END_OF_INP_IN_MAP, "false")) + && !execPlan.endOfAllInput) { + // If there is a stream in the pipeline or if this map job belongs to merge-join we could + // potentially have more to process - so lets + // set the flag stating that all map input has been sent + // already and then lets run the pipeline one more time + // This will result in nothing happening in the case + // where there is no stream or it is not a merge-join in the pipeline + execPlan.endOfAllInput = true; + runPipeline(leaf); + } + // Calling EvalFunc.finish() UDFFinishVisitor finisher = new UDFFinishVisitor(execPlan, new DependencyOrderWalker<PhysicalOperator, PhysicalPlan>( @@ -207,13 +248,15 @@ public class PigProcessor extends Abstra throw new VisitorException(msg, errCode, PigException.BUG, e); } - while (!getContext().canCommit()) { - Thread.sleep(100); - } - for (MROutput fileOutput : fileOutputs){ - fileOutput.flush(); - if (fileOutput.isCommitRequired()) { - fileOutput.commit(); + if (!fileOutputs.isEmpty()) { + while (!getContext().canCommit()) { + Thread.sleep(100); + } + for (MROutput fileOutput : fileOutputs){ + fileOutput.flush(); + if (fileOutput.isCommitRequired()) { + fileOutput.commit(); + } } } @@ -233,8 +276,8 @@ public class PigProcessor extends Abstra getContext().sendEvents(events); } } catch (Exception e) { - abortOutput(); LOG.error("Encountered exception while processing: ", e); + abortOutput(); throw e; } } @@ -243,7 +286,7 @@ public class PigProcessor extends Abstra for (MROutput fileOutput : fileOutputs){ try { fileOutput.abort(); - } catch (IOException e) { + } catch (Exception e) { LOG.error("Encountered exception while aborting output", e); } } Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/runtime/SkewedPartitionerTez.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/runtime/SkewedPartitionerTez.java?rev=1733627&r1=1733626&r2=1733627&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/runtime/SkewedPartitionerTez.java (original) +++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/runtime/SkewedPartitionerTez.java Fri Mar 4 18:17:39 2016 @@ -17,73 +17,14 @@ */ package org.apache.pig.backend.hadoop.executionengine.tez.runtime; -import java.util.Iterator; -import java.util.Map; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.partitioners.SkewedPartitioner; -import org.apache.pig.data.DataBag; -import org.apache.pig.data.Tuple; -import org.apache.pig.impl.builtin.PartitionSkewedKeys; -import org.apache.pig.impl.util.Pair; +import org.apache.pig.backend.hadoop.executionengine.tez.util.TezRuntimeUtil; public class SkewedPartitionerTez extends SkewedPartitioner { - private static final Log LOG = LogFactory.getLog(SkewedPartitionerTez.class); @Override protected void init() { - - Map<String, Object> distMap = null; - if (PigProcessor.sampleMap != null) { - // We've collected sampleMap in PigProcessor - distMap = PigProcessor.sampleMap; - } else { - LOG.info("Key distribution map is empty"); - inited = true; - return; - } - - long start = System.currentTimeMillis(); - - try { - // The distMap is structured as (key, min, max) where min, max - // being the index of the reducers - DataBag partitionList = (DataBag) distMap.get(PartitionSkewedKeys.PARTITION_LIST); - totalReducers = Integer.valueOf("" + distMap.get(PartitionSkewedKeys.TOTAL_REDUCERS)); - Iterator<Tuple> it = partitionList.iterator(); - while (it.hasNext()) { - Tuple idxTuple = it.next(); - Integer maxIndex = (Integer) idxTuple.get(idxTuple.size() - 1); - Integer minIndex = (Integer) idxTuple.get(idxTuple.size() - 2); - // Used to replace the maxIndex with the number of reducers - if (maxIndex < minIndex) { - maxIndex = totalReducers + maxIndex; - } - - Tuple keyT; - // if the join is on more than 1 key - if (idxTuple.size() > 3) { - // remove the last 2 fields of the tuple, i.e: minIndex and maxIndex and store - // it in the reducer map - Tuple keyTuple = tf.newTuple(); - for (int i=0; i < idxTuple.size() - 2; i++) { - keyTuple.append(idxTuple.get(i)); - } - keyT = keyTuple; - } else { - keyT = tf.newTuple(1); - keyT.set(0,idxTuple.get(0)); - } - // number of reducers - Integer cnt = maxIndex - minIndex; - // 1 is added to account for the 0 index - reducerMap.put(keyT, new Pair<Integer, Integer>(minIndex, cnt)); - } - } catch (Exception e) { - throw new RuntimeException(e); - } - LOG.info("Initialized SkewedPartitionerTez. Time taken: " + (System.currentTimeMillis() - start)); + reducerMap = TezRuntimeUtil.readReduceMapFromSample(tf); inited = true; } } Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/util/MRToTezHelper.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/util/MRToTezHelper.java?rev=1733627&r1=1733626&r2=1733627&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/util/MRToTezHelper.java (original) +++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/util/MRToTezHelper.java Fri Mar 4 18:17:39 2016 @@ -17,7 +17,10 @@ */ package org.apache.pig.backend.hadoop.executionengine.tez.util; +import java.io.IOException; import java.util.ArrayList; +import java.util.HashMap; +import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Map.Entry; @@ -25,30 +28,82 @@ import java.util.Map.Entry; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapreduce.InputSplit; import org.apache.hadoop.mapreduce.JobContext; +import org.apache.hadoop.mapreduce.JobSubmissionFiles; import org.apache.hadoop.mapreduce.MRJobConfig; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; +import org.apache.hadoop.mapreduce.split.JobSplitWriter; +import org.apache.hadoop.yarn.api.records.LocalResource; +import org.apache.hadoop.yarn.api.records.LocalResourceType; +import org.apache.hadoop.yarn.api.records.LocalResourceVisibility; import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.util.ConverterUtils; import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.JobControlCompiler; import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MRConfiguration; import org.apache.pig.classification.InterfaceAudience; import org.apache.tez.dag.api.TezConfiguration; +import org.apache.tez.dag.api.Vertex; import org.apache.tez.mapreduce.hadoop.DeprecatedKeys; +import org.apache.tez.mapreduce.hadoop.InputSplitInfo; +import org.apache.tez.mapreduce.hadoop.InputSplitInfoDisk; +import org.apache.tez.mapreduce.hadoop.InputSplitInfoMem; @InterfaceAudience.Private public class MRToTezHelper { private static final Log LOG = LogFactory.getLog(MRToTezHelper.class); + private static final String JOB_SPLIT_RESOURCE_NAME = MRJobConfig.JOB_SPLIT; + private static final String JOB_SPLIT_METAINFO_RESOURCE_NAME = MRJobConfig.JOB_SPLIT_METAINFO; + + private static Map<String, String> mrAMParamToTezAMParamMap = new HashMap<String, String>(); + private static Map<String, String> mrMapParamToTezVertexParamMap = new HashMap<String, String>(); + private static Map<String, String> mrReduceParamToTezVertexParamMap = new HashMap<String, String>(); private static List<String> mrSettingsToRetain = new ArrayList<String>(); + private static List<String> mrSettingsToRemove = new ArrayList<String>(); + private MRToTezHelper() { } static { + populateMRToTezParamsMap(); populateMRSettingsToRetain(); + populateMRSettingsToRemove(); + } + + private static void populateMRToTezParamsMap() { + + //AM settings + mrAMParamToTezAMParamMap.put(MRJobConfig.MR_AM_VMEM_MB, TezConfiguration.TEZ_AM_RESOURCE_MEMORY_MB); + mrAMParamToTezAMParamMap.put(MRJobConfig.MR_AM_CPU_VCORES, TezConfiguration.TEZ_AM_RESOURCE_CPU_VCORES); + mrAMParamToTezAMParamMap.put(MRJobConfig.MR_AM_MAX_ATTEMPTS, TezConfiguration.TEZ_AM_MAX_APP_ATTEMPTS); + mrAMParamToTezAMParamMap.put(MRConfiguration.JOB_CREDENTIALS_BINARY, TezConfiguration.TEZ_CREDENTIALS_PATH); + mrAMParamToTezAMParamMap.put(MRJobConfig.JOB_CANCEL_DELEGATION_TOKEN, TezConfiguration.TEZ_CANCEL_DELEGATION_TOKENS_ON_COMPLETION); + + //Map settings + mrMapParamToTezVertexParamMap.put(MRJobConfig.MAP_MAX_ATTEMPTS, TezConfiguration.TEZ_AM_TASK_MAX_FAILED_ATTEMPTS); + mrMapParamToTezVertexParamMap.put(MRJobConfig.MAP_SPECULATIVE, TezConfiguration.TEZ_AM_SPECULATION_ENABLED); + mrMapParamToTezVertexParamMap.put(MRJobConfig.MAP_LOG_LEVEL, TezConfiguration.TEZ_TASK_LOG_LEVEL); + //TezConfiguration.TEZ_AM_VERTEX_MAX_TASK_CONCURRENCY TEZ-2914 in Tez 0.8 + mrMapParamToTezVertexParamMap.put("mapreduce.job.running.map.limit", "tez.am.vertex.max-task-concurrency"); + //TezConfiguration.TEZ_TASK_PROGRESS_STUCK_INTERVAL_MS TEZ-808 in Tez 0.8 + mrMapParamToTezVertexParamMap.put(MRJobConfig.TASK_TIMEOUT, "tez.am.progress.stuck.interval-ms"); + + //Reduce settings + mrReduceParamToTezVertexParamMap.put(MRJobConfig.REDUCE_MAX_ATTEMPTS, TezConfiguration.TEZ_AM_TASK_MAX_FAILED_ATTEMPTS); + mrReduceParamToTezVertexParamMap.put(MRJobConfig.REDUCE_SPECULATIVE, TezConfiguration.TEZ_AM_SPECULATION_ENABLED); + mrReduceParamToTezVertexParamMap.put(MRJobConfig.REDUCE_LOG_LEVEL, TezConfiguration.TEZ_TASK_LOG_LEVEL); + mrReduceParamToTezVertexParamMap.put("mapreduce.job.running.reduce.limit", "tez.am.vertex.max-task-concurrency"); + mrReduceParamToTezVertexParamMap.put("mapreduce.job.running.map.limit", "tez.am.vertex.max-task-concurrency"); + mrReduceParamToTezVertexParamMap.put(MRJobConfig.TASK_TIMEOUT, "tez.am.progress.stuck.interval-ms"); } private static void populateMRSettingsToRetain() { @@ -70,25 +125,71 @@ public class MRToTezHelper { mrSettingsToRetain.add(FileOutputCommitter.SUCCESSFUL_JOB_OUTPUT_DIR_MARKER); } + private static void populateMRSettingsToRemove() { + + // FileInputFormat.listStatus() on a task can cause job failure when run from Oozie + mrSettingsToRemove.add(MRJobConfig.MAPREDUCE_JOB_CREDENTIALS_BINARY); + + mrSettingsToRemove.add(MRJobConfig.CACHE_ARCHIVES); + mrSettingsToRemove.add(MRJobConfig.CACHE_ARCHIVES_SIZES); + mrSettingsToRemove.add(MRJobConfig.CACHE_ARCHIVES_TIMESTAMPS); + mrSettingsToRemove.add(MRJobConfig.CACHE_ARCHIVES_VISIBILITIES); + mrSettingsToRemove.add(MRJobConfig.CACHE_FILES); + mrSettingsToRemove.add(MRJobConfig.CACHE_FILES_SIZES); + mrSettingsToRemove.add(MRJobConfig.CACHE_FILE_TIMESTAMPS); + mrSettingsToRemove.add(MRJobConfig.CACHE_FILE_VISIBILITIES); + mrSettingsToRemove.add(MRJobConfig.CLASSPATH_FILES); + } + + private static void removeUnwantedSettings(Configuration tezConf, boolean isAMConf) { + + // It is good to clean up as much of the unapplicable settings as possible. + // Tez has configs set on multiple places AM, DAG, Vertex, VertexManager + // Plugin, Tasks (Processor, Edge, every input and output, combiner) + // If conf size is bigger, it places heavy pressurce on AM memory and is + // inefficient while sending over RPC to tasks + + for (String mrSetting : mrSettingsToRemove) { + tezConf.unset(mrSetting); + } + + Iterator<Entry<String, String>> iter = new Configuration(tezConf).iterator(); + while (iter.hasNext()) { + String key = iter.next().getKey(); + if (!isAMConf) { + // Keep the setting in AM conf to be able to connect back to the + // Oozie launcher job and look at the parameter values passed, + // but get rid of for others + if (key.startsWith("oozie.")) { + tezConf.unset(key); + continue; + } + } + if (key.startsWith("dfs.datanode")) { + tezConf.unset(key); + } else if (key.startsWith("dfs.namenode")) { + tezConf.unset(key); + } else if (key.startsWith("yarn.nodemanager")) { + tezConf.unset(key); + } else if (key.startsWith("mapreduce.jobhistory")) { + tezConf.unset(key); + } else if (key.startsWith("mapreduce.jobtracker")) { + tezConf.unset(key); + } else if (key.startsWith("mapreduce.tasktracker")) { + tezConf.unset(key); + } + } + } + public static TezConfiguration getDAGAMConfFromMRConf( Configuration tezConf) { // Set Tez parameters based on MR parameters. TezConfiguration dagAMConf = new TezConfiguration(tezConf); - Map<String, String> mrParamToDAGParamMap = DeprecatedKeys - .getMRToDAGParamMap(); - for (Entry<String, String> entry : mrParamToDAGParamMap.entrySet()) { - if (dagAMConf.get(entry.getKey()) != null) { - dagAMConf.set(entry.getValue(), dagAMConf.get(entry.getKey())); - dagAMConf.unset(entry.getKey()); - if (LOG.isDebugEnabled()) { - LOG.debug("MR->DAG Translating MR key: " + entry.getKey() - + " to Tez key: " + entry.getValue() - + " with value " + dagAMConf.get(entry.getValue())); - } - } - } + + convertMRToTezConf(dagAMConf, dagAMConf, DeprecatedKeys.getMRToDAGParamMap()); + convertMRToTezConf(dagAMConf, dagAMConf, mrAMParamToTezAMParamMap); String env = tezConf.get(MRJobConfig.MR_AM_ADMIN_USER_ENV); if (tezConf.get(MRJobConfig.MR_AM_ENV) != null) { @@ -108,70 +209,144 @@ public class MRToTezHelper { YarnConfiguration.DEFAULT_QUEUE_NAME); dagAMConf.setIfUnset(TezConfiguration.TEZ_QUEUE_NAME, queueName); - int amMemMB = tezConf.getInt(MRJobConfig.MR_AM_VMEM_MB, - MRJobConfig.DEFAULT_MR_AM_VMEM_MB); - int amCores = tezConf.getInt(MRJobConfig.MR_AM_CPU_VCORES, - MRJobConfig.DEFAULT_MR_AM_CPU_VCORES); - dagAMConf.setIfUnset(TezConfiguration.TEZ_AM_RESOURCE_MEMORY_MB, "" - + amMemMB); - dagAMConf.setIfUnset(TezConfiguration.TEZ_AM_RESOURCE_CPU_VCORES, "" - + amCores); - dagAMConf.setIfUnset(TezConfiguration.TEZ_AM_VIEW_ACLS, tezConf.get(MRJobConfig.JOB_ACL_VIEW_JOB, MRJobConfig.DEFAULT_JOB_ACL_VIEW_JOB)); dagAMConf.setIfUnset(TezConfiguration.TEZ_AM_MODIFY_ACLS, tezConf.get(MRJobConfig.JOB_ACL_MODIFY_JOB, MRJobConfig.DEFAULT_JOB_ACL_MODIFY_JOB)); - dagAMConf.setIfUnset(TezConfiguration.TEZ_AM_MAX_APP_ATTEMPTS, "" - + dagAMConf.getInt(MRJobConfig.MR_AM_MAX_ATTEMPTS, - MRJobConfig.DEFAULT_MR_AM_MAX_ATTEMPTS)); - - if (tezConf.get(MRConfiguration.JOB_CREDENTIALS_BINARY) != null) { - dagAMConf.setIfUnset(TezConfiguration.TEZ_CREDENTIALS_PATH, - tezConf.get(MRConfiguration.JOB_CREDENTIALS_BINARY)); - } + // Hardcoding at AM level instead of setting per vertex till TEZ-2710 is available + dagAMConf.setIfUnset(TezConfiguration.TEZ_TASK_SCALE_MEMORY_RESERVE_FRACTION, "0.5"); - //TODO: Strip out all MR settings + removeUnwantedSettings(dagAMConf, true); return dagAMConf; } /** + * Set config with Scope.Vertex in TezConfiguration on the vertex + * + * @param vertex Vertex on which config is to be set + * @param isMapVertex Whether map or reduce vertex. i.e root or intermediate/leaf vertex + * @param conf Config that contains the tez or equivalent mapreduce settings. + */ + public static void setVertexConfig(Vertex vertex, boolean isMapVertex, + Configuration conf) { + Map<String, String> configMapping = isMapVertex ? mrMapParamToTezVertexParamMap + : mrReduceParamToTezVertexParamMap; + for (Entry<String, String> dep : configMapping.entrySet()) { + + String value = conf.get(dep.getValue(), conf.get(dep.getKey())); + if (value != null) { + vertex.setConf(dep.getValue(), value); + LOG.debug("Setting " + dep.getValue() + " to " + value + + " for the vertex " + vertex.getName()); + } + } + } + + /** * Process the mapreduce configuration settings and * - copy as is the still required ones (like those used by FileInputFormat/FileOutputFormat) * - convert and set equivalent tez runtime settings * - handle compression related settings * - * @param conf Configuration on which the mapreduce settings will have to be transferred + * @param tezConf Configuration on which the mapreduce settings will have to be transferred * @param mrConf Configuration that contains mapreduce settings */ - public static void processMRSettings(Configuration conf, Configuration mrConf) { + public static void processMRSettings(Configuration tezConf, Configuration mrConf) { for (String mrSetting : mrSettingsToRetain) { if (mrConf.get(mrSetting) != null) { - conf.set(mrSetting, mrConf.get(mrSetting)); + tezConf.set(mrSetting, mrConf.get(mrSetting)); } } - JobControlCompiler.configureCompression(conf); - convertMRToTezRuntimeConf(conf, mrConf); + JobControlCompiler.configureCompression(tezConf); + convertMRToTezConf(tezConf, mrConf, DeprecatedKeys.getMRToTezRuntimeParamMap()); + removeUnwantedSettings(tezConf, false); } /** * Convert MR settings to Tez settings and set on conf. * - * @param conf Configuration on which MR equivalent Tez settings should be set + * @param tezConf Configuration on which MR equivalent Tez settings should be set * @param mrConf Configuration that contains MR settings + * @param mrToTezConfigMapping Mapping of MR config to equivalent Tez config */ - private static void convertMRToTezRuntimeConf(Configuration conf, Configuration mrConf) { - for (Entry<String, String> dep : DeprecatedKeys.getMRToTezRuntimeParamMap().entrySet()) { + private static void convertMRToTezConf(Configuration tezConf, Configuration mrConf, Map<String, String> mrToTezConfigMapping) { + for (Entry<String, String> dep : mrToTezConfigMapping.entrySet()) { if (mrConf.get(dep.getKey()) != null) { - conf.unset(dep.getKey()); - LOG.info("Setting " + dep.getValue() + " to " - + mrConf.get(dep.getKey()) + " from MR setting " - + dep.getKey()); - conf.setIfUnset(dep.getValue(), mrConf.get(dep.getKey())); + if (tezConf.get(dep.getValue()) == null) { + if (LOG.isDebugEnabled()) { + LOG.debug("Setting " + dep.getValue() + " to " + + mrConf.get(dep.getKey()) + " from MR setting " + + dep.getKey()); + } + tezConf.set(dep.getValue(), mrConf.get(dep.getKey())); + } + tezConf.unset(dep.getKey()); } } } + /** + * Write input splits (job.split and job.splitmetainfo) to disk + */ + public static InputSplitInfoDisk writeInputSplitInfoToDisk( + InputSplitInfoMem infoMem, Path inputSplitsDir, JobConf jobConf, + FileSystem fs) throws IOException, InterruptedException { + + InputSplit[] splits = infoMem.getNewFormatSplits(); + JobSplitWriter.createSplitFiles(inputSplitsDir, jobConf, fs, splits); + + return new InputSplitInfoDisk( + JobSubmissionFiles.getJobSplitFile(inputSplitsDir), + JobSubmissionFiles.getJobSplitMetaFile(inputSplitsDir), + splits.length, infoMem.getTaskLocationHints(), + jobConf.getCredentials()); + } + + /** + * Exact copy of private method from from org.apache.tez.mapreduce.hadoop.MRInputHelpers + * + * Update provided localResources collection with the required local + * resources needed by MapReduce tasks with respect to Input splits. + * + * @param fs Filesystem instance to access status of splits related files + * @param inputSplitInfo Information on location of split files + * @param localResources LocalResources collection to be updated + * @throws IOException + */ + public static void updateLocalResourcesForInputSplits( + FileSystem fs, + InputSplitInfo inputSplitInfo, + Map<String, LocalResource> localResources) throws IOException { + if (localResources.containsKey(JOB_SPLIT_RESOURCE_NAME)) { + throw new RuntimeException("LocalResources already contains a" + + " resource named " + JOB_SPLIT_RESOURCE_NAME); + } + if (localResources.containsKey(JOB_SPLIT_METAINFO_RESOURCE_NAME)) { + throw new RuntimeException("LocalResources already contains a" + + " resource named " + JOB_SPLIT_METAINFO_RESOURCE_NAME); + } + + FileStatus splitFileStatus = + fs.getFileStatus(inputSplitInfo.getSplitsFile()); + FileStatus metaInfoFileStatus = + fs.getFileStatus(inputSplitInfo.getSplitsMetaInfoFile()); + localResources.put(JOB_SPLIT_RESOURCE_NAME, + LocalResource.newInstance( + ConverterUtils.getYarnUrlFromPath(inputSplitInfo.getSplitsFile()), + LocalResourceType.FILE, + LocalResourceVisibility.APPLICATION, + splitFileStatus.getLen(), splitFileStatus.getModificationTime())); + localResources.put(JOB_SPLIT_METAINFO_RESOURCE_NAME, + LocalResource.newInstance( + ConverterUtils.getYarnUrlFromPath( + inputSplitInfo.getSplitsMetaInfoFile()), + LocalResourceType.FILE, + LocalResourceVisibility.APPLICATION, + metaInfoFileStatus.getLen(), + metaInfoFileStatus.getModificationTime())); + } + } Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/util/TezCompilerUtil.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/util/TezCompilerUtil.java?rev=1733627&r1=1733626&r2=1733627&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/util/TezCompilerUtil.java (original) +++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/util/TezCompilerUtil.java Fri Mar 4 18:17:39 2016 @@ -19,13 +19,14 @@ package org.apache.pig.backend.hadoop.ex import java.io.IOException; import java.util.ArrayList; -import java.util.LinkedList; import java.util.List; +import org.apache.commons.lang.ArrayUtils; import org.apache.pig.PigException; import org.apache.pig.backend.executionengine.ExecException; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POProject; +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POUserFunc; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POForEach; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPackage; @@ -37,6 +38,9 @@ import org.apache.pig.backend.hadoop.exe import org.apache.pig.backend.hadoop.executionengine.tez.plan.operator.POLocalRearrangeTez; import org.apache.pig.backend.hadoop.executionengine.tez.plan.operator.POStoreTez; import org.apache.pig.backend.hadoop.executionengine.tez.plan.operator.POValueOutputTez; +import org.apache.pig.backend.hadoop.executionengine.tez.plan.udf.ReadScalarsTez; +import org.apache.pig.backend.hadoop.executionengine.tez.runtime.TezInput; +import org.apache.pig.backend.hadoop.executionengine.tez.runtime.TezOutput; import org.apache.pig.builtin.RoundRobinPartitioner; import org.apache.pig.data.DataType; import org.apache.pig.data.TupleFactory; @@ -123,19 +127,96 @@ public class TezCompilerUtil { static public void connect(TezOperPlan plan, TezOperator from, TezOperator to, TezEdgeDescriptor edge) throws PlanException { plan.connect(from, to); - if (from.plan.getLeaves()!=null && !from.plan.getLeaves().isEmpty()) { - PhysicalOperator leaf = from.plan.getLeaves().get(0); - // It could be POStoreTez incase of sampling job in order by - if (leaf instanceof POLocalRearrangeTez) { - POLocalRearrangeTez lr = (POLocalRearrangeTez) leaf; - lr.setOutputKey(to.getOperatorKey().toString()); - } - } + // Add edge descriptors to old and new operators to.inEdges.put(from.getOperatorKey(), edge); from.outEdges.put(to.getOperatorKey(), edge); } + static public void connectTezOpToNewPredecessor(TezOperPlan plan, + TezOperator tezOp, TezOperator newPredecessor, + TezEdgeDescriptor edge, String oldInputKey) throws PlanException { + plan.connect(newPredecessor, tezOp); + // Add edge descriptors to old and new operators + tezOp.inEdges.put(newPredecessor.getOperatorKey(), edge); + newPredecessor.outEdges.put(tezOp.getOperatorKey(), edge); + + if (oldInputKey != null) { + replaceInput(tezOp, oldInputKey, newPredecessor.getOperatorKey().toString()); + } + } + + public static void replaceInput(TezOperator tezOp, String oldInputKey, + String newInputKey) throws PlanException { + try { + List<TezInput> inputs = PlanHelper.getPhysicalOperators(tezOp.plan, TezInput.class); + for (TezInput input : inputs) { + input.replaceInput(oldInputKey, newInputKey); + } + List<POUserFunc> userFuncs = PlanHelper.getPhysicalOperators(tezOp.plan, POUserFunc.class); + for (POUserFunc userFunc : userFuncs) { + if (userFunc.getFunc() instanceof ReadScalarsTez) { + TezInput input = (TezInput)userFunc.getFunc(); + input.replaceInput(oldInputKey, newInputKey); + userFunc.getFuncSpec().setCtorArgs(input.getTezInputs()); + } + } + } catch (VisitorException e) { + throw new PlanException(e); + } + } + + static public void connectTezOpToNewSuccesor(TezOperPlan plan, + TezOperator tezOp, TezOperator newSuccessor, + TezEdgeDescriptor edge, String oldOutputKey) throws PlanException { + plan.connect(tezOp, newSuccessor); + // Add edge descriptors to old and new operators + newSuccessor.inEdges.put(tezOp.getOperatorKey(), edge); + tezOp.outEdges.put(newSuccessor.getOperatorKey(), edge); + + if (oldOutputKey != null) { + replaceOutput(tezOp, oldOutputKey, newSuccessor.getOperatorKey().toString()); + } + } + + public static void replaceOutput(TezOperator tezOp, String oldOutputKey, + String newOutputKey) throws PlanException { + try { + List<TezOutput> tezOutputs = PlanHelper.getPhysicalOperators(tezOp.plan, + TezOutput.class); + for (TezOutput tezOut : tezOutputs) { + if (ArrayUtils.contains(tezOut.getTezOutputs(), oldOutputKey)) { + tezOut.replaceOutput(oldOutputKey, newOutputKey); + } + } + } catch (VisitorException e) { + throw new PlanException(e); + } + } + + public static boolean isNonPackageInput(String inputKey, TezOperator tezOp) throws PlanException { + try { + List<TezInput> inputs = PlanHelper.getPhysicalOperators(tezOp.plan, TezInput.class); + for (TezInput input : inputs) { + if (ArrayUtils.contains(input.getTezInputs(), inputKey)) { + return true; + } + } + List<POUserFunc> userFuncs = PlanHelper.getPhysicalOperators(tezOp.plan, POUserFunc.class); + for (POUserFunc userFunc : userFuncs) { + if (userFunc.getFunc() instanceof ReadScalarsTez) { + TezInput input = (TezInput)userFunc.getFunc(); + if (ArrayUtils.contains(input.getTezInputs(), inputKey)) { + return true; + } + } + } + return false; + } catch (VisitorException e) { + throw new PlanException(e); + } + } + static public POForEach getForEach(POProject project, int rp, String scope, NodeIdGenerator nig) { PhysicalPlan forEachPlan = new PhysicalPlan(); forEachPlan.add(project); @@ -192,19 +273,4 @@ public class TezCompilerUtil { edge.setIntermediateOutputValueClass(TUPLE_CLASS); } - /** - * Returns true if there are no loads or stores in a TezOperator. - * To be called only after LoaderProcessor is called - */ - static public boolean isIntermediateReducer(TezOperator tezOper) throws VisitorException { - boolean intermediateReducer = false; - LinkedList<POStore> stores = PlanHelper.getPhysicalOperators(tezOper.plan, POStore.class); - // Not map and not final reducer - if (stores.size() <= 0 && - (tezOper.getLoaderInfo().getLoads() == null || tezOper.getLoaderInfo().getLoads().size() <= 0)) { - intermediateReducer = true; - } - return intermediateReducer; - } - }
