Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/TezSessionManager.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/TezSessionManager.java?rev=1733627&r1=1733626&r2=1733627&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/TezSessionManager.java (original) +++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/TezSessionManager.java Fri Mar 4 18:17:39 2016 @@ -92,40 +92,55 @@ public class TezSessionManager { adjustAMConfig(amConf, tezJobConf); String jobName = conf.get(PigContext.JOB_NAME, "pig"); TezClient tezClient = TezClient.create(jobName, amConf, true, requestedAMResources, creds); - tezClient.start(); - TezAppMasterStatus appMasterStatus = tezClient.getAppMasterStatus(); - if (appMasterStatus.equals(TezAppMasterStatus.SHUTDOWN)) { - throw new RuntimeException("TezSession has already shutdown"); + try { + tezClient.start(); + TezAppMasterStatus appMasterStatus = tezClient.getAppMasterStatus(); + if (appMasterStatus.equals(TezAppMasterStatus.SHUTDOWN)) { + throw new RuntimeException("TezSession has already shutdown"); + } + tezClient.waitTillReady(); + } catch (Throwable e) { + log.error("Exception while waiting for Tez client to be ready", e); + tezClient.stop(); + throw new RuntimeException(e); } - tezClient.waitTillReady(); return new SessionInfo(tezClient, requestedAMResources); } private static void adjustAMConfig(TezConfiguration amConf, TezJobConfig tezJobConf) { int requiredAMMaxHeap = -1; int requiredAMResourceMB = -1; - int configuredAMMaxHeap = Utils.extractHeapSizeInMB(amConf.get( + String amLaunchOpts = amConf.get( TezConfiguration.TEZ_AM_LAUNCH_CMD_OPTS, - TezConfiguration.TEZ_AM_LAUNCH_CMD_OPTS_DEFAULT)); + TezConfiguration.TEZ_AM_LAUNCH_CMD_OPTS_DEFAULT); + int configuredAMMaxHeap = Utils.extractHeapSizeInMB(amLaunchOpts); int configuredAMResourceMB = amConf.getInt( TezConfiguration.TEZ_AM_RESOURCE_MEMORY_MB, TezConfiguration.TEZ_AM_RESOURCE_MEMORY_MB_DEFAULT); if (tezJobConf.getEstimatedTotalParallelism() > 0) { - int minAMMaxHeap = 3584; + // Need more room for native memory/virtual address space + // when close to 4G due to 32-bit jvm 4G limit + int minAMMaxHeap = 3200; int minAMResourceMB = 4096; // Rough estimation. For 5K tasks 1G Xmx and 1.5G resource.mb - // Increment by 512 mb for every additional 5K tasks. + // Increment container size by 512 mb for every additional 5K tasks. + // 30000 and above - 3200Xmx, 4096 (896 native memory) + // 25000 and above - 3072Xmx, 3584 + // 20000 and above - 2560Xmx, 3072 + // 15000 and above - 2048Xmx, 2560 + // 10000 and above - 1536Xmx, 2048 + // 5000 and above - 1024Xmx, 1536 (512 native memory) for (int taskCount = 30000; taskCount >= 5000; taskCount-=5000) { - if (tezJobConf.getEstimatedTotalParallelism() > taskCount) { + if (tezJobConf.getEstimatedTotalParallelism() >= taskCount) { requiredAMMaxHeap = minAMMaxHeap; requiredAMResourceMB = minAMResourceMB; break; } - minAMMaxHeap = minAMMaxHeap - 512; minAMResourceMB = minAMResourceMB - 512; + minAMMaxHeap = minAMResourceMB - 512; } if (requiredAMResourceMB > -1 && configuredAMResourceMB < requiredAMResourceMB) { @@ -139,13 +154,14 @@ public class TezSessionManager { if (requiredAMMaxHeap > -1 && configuredAMMaxHeap < requiredAMMaxHeap) { amConf.set(TezConfiguration.TEZ_AM_LAUNCH_CMD_OPTS, - amConf.get(TezConfiguration.TEZ_AM_LAUNCH_CMD_OPTS) - + " -Xmx" + requiredAMMaxHeap + "M"); + amLaunchOpts + " -Xmx" + requiredAMMaxHeap + "M"); log.info("Increasing Tez AM Heap Size from " + configuredAMMaxHeap + "M to " + requiredAMMaxHeap + "M as the number of total estimated tasks is " + tezJobConf.getEstimatedTotalParallelism()); + log.info("Value of " + TezConfiguration.TEZ_AM_LAUNCH_CMD_OPTS + " is now " + + amConf.get(TezConfiguration.TEZ_AM_LAUNCH_CMD_OPTS)); } } }
Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezCompiler.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezCompiler.java?rev=1733627&r1=1733626&r2=1733627&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezCompiler.java (original) +++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezCompiler.java Fri Mar 4 18:17:39 2016 @@ -20,6 +20,7 @@ package org.apache.pig.backend.hadoop.ex import java.io.IOException; import java.io.Serializable; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collections; import java.util.HashMap; import java.util.Iterator; @@ -34,6 +35,7 @@ import org.apache.commons.logging.LogFac import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.mapreduce.lib.partition.HashPartitioner; +import org.apache.hadoop.util.StringUtils; import org.apache.pig.CollectableLoadFunc; import org.apache.pig.FuncSpec; import org.apache.pig.IndexableLoadFunc; @@ -95,6 +97,7 @@ import org.apache.pig.backend.hadoop.exe import org.apache.pig.backend.hadoop.executionengine.tez.plan.operator.POValueInputTez; import org.apache.pig.backend.hadoop.executionengine.tez.plan.operator.POValueOutputTez; import org.apache.pig.backend.hadoop.executionengine.tez.plan.udf.FindQuantilesTez; +import org.apache.pig.backend.hadoop.executionengine.tez.plan.udf.IsFirstReduceOfKeyTez; import org.apache.pig.backend.hadoop.executionengine.tez.plan.udf.PartitionSkewedKeysTez; import org.apache.pig.backend.hadoop.executionengine.tez.plan.udf.ReadScalarsTez; import org.apache.pig.backend.hadoop.executionengine.tez.runtime.SkewedPartitionerTez; @@ -170,6 +173,7 @@ public class TezCompiler extends PhyPlan private int fileConcatenationThreshold = 100; private boolean optimisticFileConcatenation = false; + private List<String> readOnceLoadFuncs = null; private POLocalRearrangeTezFactory localRearrangeFactory; @@ -200,6 +204,12 @@ public class TezCompiler extends PhyPlan OPTIMISTIC_FILE_CONCATENATION, "false").equals("true"); LOG.info("File concatenation threshold: " + fileConcatenationThreshold + " optimistic? " + optimisticFileConcatenation); + + String loadFuncs = pigContext.getProperties().getProperty( + PigConfiguration.PIG_SORT_READONCE_LOADFUNCS); + if (loadFuncs != null && loadFuncs.trim().length() > 0) { + readOnceLoadFuncs = Arrays.asList(StringUtils.split(loadFuncs.trim())); + } } public TezOperPlan getTezPlan() { @@ -284,17 +294,20 @@ public class TezCompiler extends PhyPlan FuncSpec newSpec = new FuncSpec(ReadScalarsTez.class.getName(), from.getOperatorKey().toString()); userFunc.setFuncSpec(newSpec); + //Remove unused store filename + if (userFunc.getInputs().size() == 2) { + userFunc.getInputs().remove(1); + } + if (storeSeen.containsKey(store)) { storeSeen.get(store).addOutputKey(tezOp.getOperatorKey().toString()); } else { POValueOutputTez output = new POValueOutputTez(OperatorKey.genOpKey(scope)); + output.setScalarOutput(true); output.addOutputKey(tezOp.getOperatorKey().toString()); from.plan.remove(from.plan.getOperator(store.getOperatorKey())); from.plan.addAsLeaf(output); storeSeen.put(store, output); - - //Remove unused store filename - userFunc.getInputs().remove(1); } if (tezPlan.getPredecessors(tezOp)==null || !tezPlan.getPredecessors(tezOp).contains(from)) { @@ -350,52 +363,18 @@ public class TezCompiler extends PhyPlan String msg = "Predecessor of load should be a store or native oper. Got " + p.getClass(); throw new PlanException(msg, errCode, PigException.BUG); } - if (p instanceof POStore) { - PhysicalOperator store = oper.plan.getOperator(p.getOperatorKey()); - // replace POStore to POValueOutputTez, convert the tezOperator to splitter - oper.plan.disconnect(oper.plan.getPredecessors(store).get(0), store); - oper.plan.remove(store); - POValueOutputTez valueOutput = new POValueOutputTez(new OperatorKey(scope,nig.getNextNodeId(scope))); - oper.plan.addAsLeaf(valueOutput); - oper.setSplitter(true); - - // Create a splittee of store only - TezOperator storeOnlyTezOperator = getTezOp(); - PhysicalPlan storeOnlyPhyPlan = new PhysicalPlan(); - POValueInputTez valueInput = new POValueInputTez(new OperatorKey(scope,nig.getNextNodeId(scope))); - valueInput.setInputKey(oper.getOperatorKey().toString()); - storeOnlyPhyPlan.addAsLeaf(valueInput); - storeOnlyPhyPlan.addAsLeaf(store); - storeOnlyTezOperator.plan = storeOnlyPhyPlan; - tezPlan.add(storeOnlyTezOperator); - phyToTezOpMap.put(p, storeOnlyTezOperator); - - // Create new operator as second splittee - curTezOp = getTezOp(); - POValueInputTez valueInput2 = new POValueInputTez(new OperatorKey(scope,nig.getNextNodeId(scope))); - valueInput2.setInputKey(oper.getOperatorKey().toString()); - curTezOp.plan.add(valueInput2); - tezPlan.add(curTezOp); - - // Connect splitter to splittee - TezEdgeDescriptor edge = TezCompilerUtil.connect(tezPlan, oper, storeOnlyTezOperator); - TezCompilerUtil.configureValueOnlyTupleOutput(edge, DataMovementType.ONE_TO_ONE); - storeOnlyTezOperator.setRequestedParallelismByReference(oper); - - edge = TezCompilerUtil.connect(tezPlan, oper, curTezOp); - TezCompilerUtil.configureValueOnlyTupleOutput(edge, DataMovementType.ONE_TO_ONE); - curTezOp.setRequestedParallelismByReference(oper); - } else if (p instanceof PONative) { - // Need new operator - curTezOp = getTezOp(); - curTezOp.plan.add(op); - tezPlan.add(curTezOp); - - plan.disconnect(op, p); - TezCompilerUtil.connect(tezPlan, oper, curTezOp); - phyToTezOpMap.put(op, curTezOp); - return; + curTezOp = getTezOp(); + curTezOp.plan.add(op); + curTezOp.setUseMRMapSettings(true); + if (((POLoad) op).getLFile() != null + && ((POLoad) op).getLFile().getFuncSpec() != null) { + curTezOp.UDFs.add(((POLoad)op).getLFile().getFuncSpec().toString()); } + tezPlan.add(curTezOp); + phyToTezOpMap.put(op, curTezOp); + plan.disconnect(op, p); + TezCompilerUtil.connect(tezPlan, oper, curTezOp); + oper.segmentBelow = true; return; } @@ -640,6 +619,7 @@ public class TezCompiler extends PhyPlan public void visitCounter(POCounter op) throws VisitorException { // Refer visitRank(PORank) for more details try{ + curTezOp.markRankCounter(); POCounterTez counterTez = new POCounterTez(op); nonBlocking(counterTez); phyToTezOpMap.put(op, curTezOp); @@ -685,6 +665,7 @@ public class TezCompiler extends PhyPlan clr.setDistinct(true); combinePlan.addAsLeaf(clr); + curTezOp.markDistinct(); addDistinctPlan(curTezOp.plan, op.getRequestedParallelism()); curTezOp.setRequestedParallelism(op.getRequestedParallelism()); phyToTezOpMap.put(op, curTezOp); @@ -1134,7 +1115,6 @@ public class TezCompiler extends PhyPlan public void visitMergeJoin(POMergeJoin joinOp) throws VisitorException { try{ - joinOp.setEndOfRecordMark(POStatus.STATUS_NULL); if(compiledInputs.length != 2 || joinOp.getInputs().size() != 2){ int errCode=1101; throw new TezCompilerException("Merge Join must have exactly two inputs. Found : "+compiledInputs.length, errCode); @@ -1476,13 +1456,26 @@ public class TezCompiler extends PhyPlan if (pigProperties.containsKey(PigConfiguration.PIG_SKEWEDJOIN_REDUCE_MEMUSAGE)) { heapPerc = Float.valueOf(pigProperties.getProperty(PigConfiguration.PIG_SKEWEDJOIN_REDUCE_MEMUSAGE)); } + long totalMemory = -1; + if (pigProperties.containsKey(PigConfiguration.PIG_SKEWEDJOIN_REDUCE_MEM)) { + totalMemory = Long.valueOf(pigProperties.getProperty(PigConfiguration.PIG_SKEWEDJOIN_REDUCE_MEM)); + } POPoissonSample poSample = new POPoissonSample(new OperatorKey(scope,nig.getNextNodeId(scope)), - -1, sampleRate, heapPerc); + -1, sampleRate, heapPerc, totalMemory); + + TezOperator samplerOper = compiledInputs[0]; + boolean writeDataForPartitioner = shouldWriteDataForPartitioner(samplerOper); + + PhysicalPlan partitionerPlan = null; + if (writeDataForPartitioner) { + samplerOper.plan.addAsLeaf(lrTez); + } else { + partitionerPlan = samplerOper.plan.clone(); + partitionerPlan.addAsLeaf(lrTez); + } - TezOperator prevOp = compiledInputs[0]; - prevOp.plan.addAsLeaf(lrTez); - prevOp.plan.addAsLeaf(poSample); - prevOp.markSampler(); + samplerOper.plan.addAsLeaf(poSample); + samplerOper.markSampler(); MultiMap<PhysicalOperator, PhysicalPlan> joinPlans = op.getJoinPlans(); List<PhysicalOperator> l = plan.getPredecessors(op); @@ -1526,9 +1519,9 @@ public class TezCompiler extends PhyPlan // This foreach will pick the sort key columns from the POPoissonSample output POForEach nfe1 = new POForEach(new OperatorKey(scope,nig.getNextNodeId(scope)), -1, eps1, flat1); - prevOp.plan.addAsLeaf(nfe1); - prevOp.plan.addAsLeaf(lrTezSample); - prevOp.setClosed(true); + samplerOper.plan.addAsLeaf(nfe1); + samplerOper.plan.addAsLeaf(lrTezSample); + samplerOper.setClosed(true); int rp = op.getRequestedParallelism(); if (rp == -1) { @@ -1551,10 +1544,9 @@ public class TezCompiler extends PhyPlan compiledInputs = new TezOperator[] {joinInputs[0]}; - blocking(); - - // Add a POIdentityInOutTez to the joinJobs[0] which is a partition vertex. - // It just partitions the data from first vertex based on the quantiles from sample vertex. + // Add a partitioner vertex that partitions the data based on the quantiles from sample vertex. + curTezOp = getTezOp(); + tezPlan.add(curTezOp); joinJobs[0] = curTezOp; try { @@ -1572,15 +1564,38 @@ public class TezCompiler extends PhyPlan } lrTez.setKeyType(type); lrTez.setPlans(groups); - lrTez.setSkewedJoin(true); lrTez.setResultType(DataType.TUPLE); - POIdentityInOutTez identityInOutTez = new POIdentityInOutTez( - OperatorKey.genOpKey(scope), lrTez); - identityInOutTez.setInputKey(prevOp.getOperatorKey().toString()); - joinJobs[0].plan.addAsLeaf(identityInOutTez); + POLocalRearrangeTez partitionerLR = null; + if (!writeDataForPartitioner) { + // Read input from hdfs again + joinJobs[0].plan = partitionerPlan; + partitionerLR = lrTez; + lrTez.setSkewedJoin(true); + } else { + // Add a POIdentityInOutTez which just passes data through from sampler vertex + partitionerLR = new POIdentityInOutTez( + OperatorKey.genOpKey(scope), + lrTez, + samplerOper.getOperatorKey().toString()); + partitionerLR.setSkewedJoin(true); + joinJobs[0].plan.addAsLeaf(partitionerLR); + + // Connect the sampler vertex to the partitioner vertex + lrTez.setOutputKey(joinJobs[0].getOperatorKey().toString()); + TezEdgeDescriptor edge = TezCompilerUtil.connect(tezPlan, samplerOper, joinJobs[0]); + // TODO: PIG-3775 unsorted shuffle + edge.dataMovementType = DataMovementType.ONE_TO_ONE; + edge.outputClassName = UnorderedKVOutput.class.getName(); + edge.inputClassName = UnorderedKVInput.class.getName(); + // If prevOp.requestedParallelism changes based on no. of input splits + // it will reflect for joinJobs[0] so that 1-1 edge will work. + joinJobs[0].setRequestedParallelismByReference(samplerOper); + } joinJobs[0].setClosed(true); joinJobs[0].markSampleBasedPartitioner(); + joinJobs[0].setUseMRMapSettings(samplerOper.isUseMRMapSettings()); + rearrangeOutputs[0] = joinJobs[0]; compiledInputs = new TezOperator[] {joinInputs[1]}; @@ -1633,6 +1648,7 @@ public class TezCompiler extends PhyPlan List<PhysicalPlan> eps = new ArrayList<PhysicalPlan>(); List<Boolean> flat = new ArrayList<Boolean>(); + boolean containsRightOuter = false; // Add corresponding POProjects for (int i=0; i < 2; i++) { ep = new PhysicalPlan(); @@ -1643,8 +1659,13 @@ public class TezCompiler extends PhyPlan ep.add(prj); eps.add(ep); if (!inner[i]) { - // Add an empty bag for outer join - CompilerUtils.addEmptyBagOuterJoin(ep, op.getSchema(i)); + // Add an empty bag for outer join. For right outer, add IsFirstReduceOfKeyTez UDF as well + if (i == 0) { + containsRightOuter = true; + CompilerUtils.addEmptyBagOuterJoin(ep, op.getSchema(i), true, IsFirstReduceOfKeyTez.class.getName()); + } else { + CompilerUtils.addEmptyBagOuterJoin(ep, op.getSchema(i), false, IsFirstReduceOfKeyTez.class.getName()); + } } flat.add(true); } @@ -1655,36 +1676,30 @@ public class TezCompiler extends PhyPlan fe.visit(this); // Connect vertices - lrTez.setOutputKey(joinJobs[0].getOperatorKey().toString()); lrTezSample.setOutputKey(sampleJobPair.first.getOperatorKey().toString()); - identityInOutTez.setOutputKey(joinJobs[2].getOperatorKey().toString()); + partitionerLR.setOutputKey(joinJobs[2].getOperatorKey().toString()); pr.setOutputKey(joinJobs[2].getOperatorKey().toString()); - TezEdgeDescriptor edge = joinJobs[0].inEdges.get(prevOp.getOperatorKey()); - joinJobs[0].setUseMRMapSettings(prevOp.isUseMRMapSettings()); - // TODO: Convert to unsorted shuffle after TEZ-661 - // Use 1-1 edge - edge.dataMovementType = DataMovementType.ONE_TO_ONE; - edge.outputClassName = UnorderedKVOutput.class.getName(); - edge.inputClassName = UnorderedKVInput.class.getName(); - // If prevOp.requestedParallelism changes based on no. of input splits - // it will reflect for joinJobs[0] so that 1-1 edge will work. - joinJobs[0].setRequestedParallelismByReference(prevOp); - - TezCompilerUtil.connect(tezPlan, prevOp, sampleJobPair.first); + TezCompilerUtil.connect(tezPlan, samplerOper, sampleJobPair.first); POValueOutputTez sampleOut = (POValueOutputTez) sampleJobPair.first.plan.getLeaves().get(0); - for (int i = 0; i < 2; i++) { - joinJobs[i].setSampleOperator(sampleJobPair.first); - - // Configure broadcast edges for distribution map - edge = TezCompilerUtil.connect(tezPlan, sampleJobPair.first, joinJobs[i]); - TezCompilerUtil.configureValueOnlyTupleOutput(edge, DataMovementType.BROADCAST); - sampleOut.addOutputKey(joinJobs[i].getOperatorKey().toString()); + for (int i = 0; i <= 2; i++) { + if (i != 2 || containsRightOuter) { + // We need to send sample to left relation partitioner vertex, right relation load vertex, + // and join vertex (IsFirstReduceOfKey in join vertex need sample file as well) + joinJobs[i].setSampleOperator(sampleJobPair.first); + + // Configure broadcast edges for distribution map + TezEdgeDescriptor edge = TezCompilerUtil.connect(tezPlan, sampleJobPair.first, joinJobs[i]); + TezCompilerUtil.configureValueOnlyTupleOutput(edge, DataMovementType.BROADCAST); + sampleOut.addOutputKey(joinJobs[i].getOperatorKey().toString()); + } // Configure skewed partitioner for join - edge = joinJobs[2].inEdges.get(joinJobs[i].getOperatorKey()); - edge.partitionerClass = SkewedPartitionerTez.class; + if (i != 2) { + TezEdgeDescriptor edge = joinJobs[2].inEdges.get(joinJobs[i].getOperatorKey()); + edge.partitionerClass = SkewedPartitionerTez.class; + } } joinJobs[2].markSkewedJoin(); @@ -1712,55 +1727,79 @@ public class TezCompiler extends PhyPlan new FuncSpec(Utils.getTmpFileCompressorName(pigContext))); } + private boolean shouldWriteDataForPartitioner(TezOperator samplerOper) { + // If there are operators other than load and foreach (like filter + // split, etc) in the plan, then process and write the data out + // and save the cost of processing in the partitioner vertex + // Else read from hdfs again save the IO cost of the extra write + boolean writeDataForPartitioner = false; + if (samplerOper.plan.getRoots().get(0) instanceof POLoad) { + for (PhysicalOperator oper : samplerOper.plan) { + if (oper instanceof POForEach) { + continue; + } else if (oper instanceof POLoad && oper.getInputs() == null) { + // TODO: oper.getInputs() is not null in case of PONative and + // clone needs to be fixed in that case. e2e test - Native_2. + String loadFunc = ((POLoad) oper).getLoadFunc().getClass().getName(); + // We do not want to read all data again from hbase/accumulo for sampling. + if (readOnceLoadFuncs == null || !readOnceLoadFuncs.contains(loadFunc)) { + continue; + } + } + writeDataForPartitioner = true; + break; + } + } else { + writeDataForPartitioner = true; + } + return writeDataForPartitioner; + } + /** - * Force an end to the current vertex with store and sample - * @return Tez operator that now is finished with a store. - * @throws PlanException + * Get LocalRearrange for POSort input */ - private TezOperator endSingleInputWithStoreAndSample( - POSort sort, - POLocalRearrangeTez lr, - POLocalRearrangeTez lrSample, - byte keyType, - Pair<POProject, Byte>[] fields) throws PlanException { - if(compiledInputs.length>1) { - int errCode = 2023; - String msg = "Received a multi input plan when expecting only a single input one."; - throw new PlanException(msg, errCode, PigException.BUG); + private POLocalRearrangeTez getLocalRearrangeForSortInput(POSort sort, + byte keyType, Pair<POProject, Byte>[] fields) + throws PlanException { + POLocalRearrangeTez lr = new POLocalRearrangeTez(OperatorKey.genOpKey(scope)); + List<PhysicalPlan> eps = new ArrayList<PhysicalPlan>(); + if (fields == null) { + // This is project * + PhysicalPlan ep = new PhysicalPlan(); + POProject prj = new POProject(new OperatorKey(scope,nig.getNextNodeId(scope))); + prj.setStar(true); + prj.setOverloaded(false); + prj.setResultType(DataType.TUPLE); + ep.add(prj); + eps.add(ep); + } else { + // Attach the sort plans to the local rearrange to get the + // projection. + eps.addAll(sort.getSortPlans()); } - TezOperator oper = compiledInputs[0]; - if (!oper.isClosed()) { - List<PhysicalPlan> eps = new ArrayList<PhysicalPlan>(); - if (fields == null) { - // This is project * - PhysicalPlan ep = new PhysicalPlan(); - POProject prj = new POProject(new OperatorKey(scope,nig.getNextNodeId(scope))); - prj.setStar(true); - prj.setOverloaded(false); - prj.setResultType(DataType.TUPLE); - ep.add(prj); - eps.add(ep); - } else { - // Attach the sort plans to the local rearrange to get the - // projection. - eps.addAll(sort.getSortPlans()); - } + try { + lr.setIndex(0); + } catch (ExecException e) { + int errCode = 2058; + String msg = "Unable to set index on newly created POLocalRearrange."; + throw new PlanException(msg, errCode, PigException.BUG, e); + } + lr.setKeyType((fields == null || fields.length>1) ? DataType.TUPLE : keyType); + lr.setPlans(eps); + lr.setResultType(DataType.TUPLE); + lr.addOriginalLocation(sort.getAlias(), sort.getOriginalLocations()); + return lr; + } - try { - lr.setIndex(0); - } catch (ExecException e) { - int errCode = 2058; - String msg = "Unable to set index on newly created POLocalRearrange."; - throw new PlanException(msg, errCode, PigException.BUG, e); - } - lr.setKeyType((fields == null || fields.length>1) ? DataType.TUPLE : keyType); - lr.setPlans(eps); - lr.setResultType(DataType.TUPLE); - lr.addOriginalLocation(sort.getAlias(), sort.getOriginalLocations()); + /** + * Add a sampler to the sort input + */ + private POLocalRearrangeTez addSamplingToSortInput(POSort sort, TezOperator oper, + byte keyType, Pair<POProject, Byte>[] fields) throws PlanException { - lr.setOutputKey(curTezOp.getOperatorKey().toString()); - oper.plan.addAsLeaf(lr); + POLocalRearrangeTez lrSample = localRearrangeFactory.create(LocalRearrangeType.NULL); + if (!oper.isClosed()) { List<Boolean> flat1 = new ArrayList<Boolean>(); List<PhysicalPlan> eps1 = new ArrayList<PhysicalPlan>(); @@ -1861,7 +1900,8 @@ public class TezCompiler extends PhyPlan String msg = "The current operator is closed. This is unexpected while compiling."; throw new PlanException(msg, errCode, PigException.BUG); } - return oper; + oper.markSampler(); + return lrSample; } private Pair<TezOperator,Integer> getOrderbySamplingAggregationJob( @@ -2098,32 +2138,42 @@ public class TezCompiler extends PhyPlan private TezOperator[] getSortJobs( TezOperator inputOper, + PhysicalPlan partitionerPlan, POLocalRearrangeTez inputOperRearrange, POSort sort, byte keyType, Pair<POProject, Byte>[] fields) throws PlanException{ + TezOperator[] opers = new TezOperator[2]; + + // Partitioner Vertex TezOperator oper1 = getTezOp(); tezPlan.add(oper1); opers[0] = oper1; - - POIdentityInOutTez identityInOutTez = new POIdentityInOutTez( - OperatorKey.genOpKey(scope), - inputOperRearrange); - identityInOutTez.setInputKey(inputOper.getOperatorKey().toString()); - oper1.plan.addAsLeaf(identityInOutTez); + POLocalRearrangeTez partitionerLR = null; + if (partitionerPlan != null) { + // Read from hdfs again + oper1.plan = partitionerPlan; + partitionerLR = inputOperRearrange; + } else { + partitionerLR = new POIdentityInOutTez( + OperatorKey.genOpKey(scope), + inputOperRearrange, + inputOper.getOperatorKey().toString()); + oper1.plan.addAsLeaf(partitionerLR); + } oper1.setClosed(true); oper1.markSampleBasedPartitioner(); + // Global Sort Vertex TezOperator oper2 = getTezOp(); + partitionerLR.setOutputKey(oper2.getOperatorKey().toString()); oper2.markGlobalSort(); opers[1] = oper2; tezPlan.add(oper2); long limit = sort.getLimit(); - boolean[] sortOrder; - List<Boolean> sortOrderList = sort.getMAscCols(); if(sortOrderList != null) { sortOrder = new boolean[sortOrderList.size()]; @@ -2133,8 +2183,6 @@ public class TezCompiler extends PhyPlan oper2.setSortOrder(sortOrder); } - identityInOutTez.setOutputKey(oper2.getOperatorKey().toString()); - if (limit!=-1) { POPackage pkg_c = new POPackage(OperatorKey.genOpKey(scope)); pkg_c.setPkgr(new LitePackager()); @@ -2214,6 +2262,13 @@ public class TezCompiler extends PhyPlan @Override public void visitSort(POSort op) throws VisitorException { try{ + + if (compiledInputs.length > 1) { + int errCode = 2023; + String msg = "Received a multi input plan when expecting only a single input one."; + throw new PlanException(msg, errCode, PigException.BUG); + } + Pair<POProject, Byte>[] fields = getSortCols(op.getSortPlans()); byte keyType = DataType.UNKNOWN; @@ -2228,58 +2283,65 @@ public class TezCompiler extends PhyPlan throw new PlanException(msg, errCode, PigException.BUG, ve); } - POLocalRearrangeTez lr = new POLocalRearrangeTez(OperatorKey.genOpKey(scope)); - POLocalRearrangeTez lrSample = localRearrangeFactory.create(LocalRearrangeType.NULL); + TezOperator samplerOper = compiledInputs[0]; + boolean writeDataForPartitioner = shouldWriteDataForPartitioner(samplerOper); - TezOperator prevOper = endSingleInputWithStoreAndSample(op, lr, lrSample, keyType, fields); - prevOper.markSampler(); + POLocalRearrangeTez lr = getLocalRearrangeForSortInput(op, keyType, fields); + PhysicalPlan partitionerPlan = null; + if (writeDataForPartitioner) { + samplerOper.plan.addAsLeaf(lr); + } else { + partitionerPlan = samplerOper.plan.clone(); + partitionerPlan.addAsLeaf(lr); + } + // if rp is still -1, let it be, TezParallelismEstimator will set it to an estimated rp int rp = op.getRequestedParallelism(); if (rp == -1) { rp = pigContext.defaultParallel; } - // if rp is still -1, let it be, TezParallelismEstimator will set it to an estimated rp + // Add sampling to sort input. Create a sample aggregation operator and connect both + POLocalRearrangeTez lrSample = addSamplingToSortInput(op, samplerOper, keyType, fields); Pair<TezOperator, Integer> quantJobParallelismPair = getOrderbySamplingAggregationJob(op, rp); - TezOperator[] sortOpers = getSortJobs(prevOper, lr, op, keyType, fields); + TezCompilerUtil.connect(tezPlan, samplerOper, quantJobParallelismPair.first); - TezEdgeDescriptor edge = TezCompilerUtil.connect(tezPlan, prevOper, sortOpers[0]); - sortOpers[0].setUseMRMapSettings(prevOper.isUseMRMapSettings()); + // Create the partitioner and the global sort vertices + TezOperator[] sortOpers = getSortJobs(samplerOper, partitionerPlan, lr, op, keyType, fields); + sortOpers[0].setUseMRMapSettings(samplerOper.isUseMRMapSettings()); + + if (writeDataForPartitioner) { + // Connect the sampler and partitioner vertex + lr.setOutputKey(sortOpers[0].getOperatorKey().toString()); + TezEdgeDescriptor edge = TezCompilerUtil.connect(tezPlan, samplerOper, sortOpers[0]); - // Use 1-1 edge - edge.dataMovementType = DataMovementType.ONE_TO_ONE; - edge.outputClassName = UnorderedKVOutput.class.getName(); - edge.inputClassName = UnorderedKVInput.class.getName(); - // If prevOper.requestedParallelism changes based on no. of input splits - // it will reflect for sortOpers[0] so that 1-1 edge will work. - sortOpers[0].setRequestedParallelismByReference(prevOper); - if (rp==-1) { - quantJobParallelismPair.first.setNeedEstimatedQuantile(true); + // Use 1-1 edge + edge.dataMovementType = DataMovementType.ONE_TO_ONE; + edge.outputClassName = UnorderedKVOutput.class.getName(); + edge.inputClassName = UnorderedKVInput.class.getName(); + // If prevOper.requestedParallelism changes based on no. of input splits + // it will reflect for sortOpers[0] so that 1-1 edge will work. + sortOpers[0].setRequestedParallelismByReference(samplerOper); } - sortOpers[1].setRequestedParallelism(quantJobParallelismPair.second); - /* - // TODO: Convert to unsorted shuffle after TEZ-661 - // edge.outputClassName = UnorderedKVOutput.class.getName(); - // edge.inputClassName = UnorderedKVInput.class.getName(); - edge.partitionerClass = RoundRobinPartitioner.class; - sortOpers[0].setRequestedParallelism(quantJobParallelismPair.second); + if (rp == -1) { + quantJobParallelismPair.first.setNeedEstimatedQuantile(true); + } + quantJobParallelismPair.first.setSortOperator(sortOpers[1]); sortOpers[1].setRequestedParallelism(quantJobParallelismPair.second); - */ - TezCompilerUtil.connect(tezPlan, prevOper, quantJobParallelismPair.first); - lr.setOutputKey(sortOpers[0].getOperatorKey().toString()); - lrSample.setOutputKey(quantJobParallelismPair.first.getOperatorKey().toString()); - - edge = TezCompilerUtil.connect(tezPlan, quantJobParallelismPair.first, sortOpers[0]); + // Broadcast the sample to Partitioner vertex + TezEdgeDescriptor edge = TezCompilerUtil.connect(tezPlan, quantJobParallelismPair.first, sortOpers[0]); TezCompilerUtil.configureValueOnlyTupleOutput(edge, DataMovementType.BROADCAST); POValueOutputTez sampleOut = (POValueOutputTez)quantJobParallelismPair.first.plan.getLeaves().get(0); sampleOut.addOutputKey(sortOpers[0].getOperatorKey().toString()); sortOpers[0].setSampleOperator(quantJobParallelismPair.first); + lrSample.setOutputKey(quantJobParallelismPair.first.getOperatorKey().toString()); + + // Connect the Partitioner and Global Sort vertex edge = TezCompilerUtil.connect(tezPlan, sortOpers[0], sortOpers[1]); edge.partitionerClass = WeightedRangePartitionerTez.class; - curTezOp = sortOpers[1]; // TODO: Review sort udf @@ -2287,7 +2349,6 @@ public class TezCompiler extends PhyPlan // curTezOp.UDFs.add(op.getMSortFunc().getFuncSpec().toString()); // curTezOp.isUDFComparatorUsed = true; // } - quantJobParallelismPair.first.setSortOperator(sortOpers[1]); // If Order by followed by Limit and parallelism of order by is not 1 // add a new vertex for Limit with parallelism 1. Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezEdgeDescriptor.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezEdgeDescriptor.java?rev=1733627&r1=1733626&r2=1733627&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezEdgeDescriptor.java (original) +++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezEdgeDescriptor.java Fri Mar 4 18:17:39 2016 @@ -17,6 +17,8 @@ */ package org.apache.pig.backend.hadoop.executionengine.tez.plan; +import java.io.Serializable; + import org.apache.hadoop.mapreduce.Partitioner; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan; import org.apache.tez.dag.api.EdgeProperty.DataMovementType; @@ -28,9 +30,9 @@ import org.apache.tez.runtime.library.ou /** * Descriptor for Tez edge. It holds combine plan as well as edge properties. */ -public class TezEdgeDescriptor { +public class TezEdgeDescriptor implements Serializable { // Combiner runs on both input and output of Tez edge. - public PhysicalPlan combinePlan; + transient public PhysicalPlan combinePlan; public String inputClassName; public String outputClassName; Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezOperPlan.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezOperPlan.java?rev=1733627&r1=1733626&r2=1733627&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezOperPlan.java (original) +++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezOperPlan.java Fri Mar 4 18:17:39 2016 @@ -32,6 +32,7 @@ import java.util.Map; import java.util.Set; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.security.Credentials; import org.apache.hadoop.yarn.api.records.LocalResource; import org.apache.pig.backend.hadoop.executionengine.tez.TezResourceManager; import org.apache.pig.backend.hadoop.executionengine.tez.util.TezCompilerUtil; @@ -50,11 +51,18 @@ public class TezOperPlan extends Operato private static final long serialVersionUID = 1L; - private Map<String, Path> extraResources = new HashMap<String, Path>(); + private transient Map<String, Path> extraResources = new HashMap<String, Path>(); private int estimatedTotalParallelism = -1; + private transient Credentials creds; + public TezOperPlan() { + creds = new Credentials(); + } + + public Credentials getCredentials() { + return creds; } public int getEstimatedTotalParallelism() { @@ -146,7 +154,7 @@ public class TezOperPlan extends Operato URI resourceURI = new URI(fileName); String fragment = resourceURI.getFragment(); - Path remoteFsPath = new Path(resourceURI.getPath()); + Path remoteFsPath = new Path(resourceURI); String resourceName = (fragment != null && fragment.length() > 0) ? fragment : remoteFsPath.getName(); addExtraResource(resourceName, remoteFsPath); @@ -185,10 +193,8 @@ public class TezOperPlan extends Operato public void moveTree(TezOperator root, TezOperPlan newPlan) throws PlanException { List<TezOperator> list = new ArrayList<TezOperator>(); list.add(root); - int prevSize = 0; int pos = 0; - while (list.size() > prevSize) { - prevSize = list.size(); + while (list.size() > pos) { TezOperator node = list.get(pos); if (getSuccessors(node)!=null) { for (TezOperator succ : getSuccessors(node)) { @@ -234,5 +240,47 @@ public class TezOperPlan extends Operato super.remove(node); } } + + // This method is used in PigGraceShuffleVertexManager to get a list of grandparents. + // Also need to exclude grandparents which also a parent (a is both parent and grandparent in the diagram below) + // a -> c + // \ b / + // + public static List<TezOperator> getGrandParentsForGraceParallelism(TezOperPlan tezPlan, TezOperator op) { + List<TezOperator> grandParents = new ArrayList<TezOperator>(); + List<TezOperator> preds = tezPlan.getPredecessors(op); + if (preds != null) { + for (TezOperator pred : preds) { + if (pred.isVertexGroup()) { + grandParents.clear(); + return grandParents; + } + List<TezOperator> predPreds = tezPlan.getPredecessors(pred); + if (predPreds!=null) { + for (TezOperator predPred : predPreds) { + if (predPred.isVertexGroup()) { + grandParents.clear(); + return grandParents; + } + if (!grandParents.contains(predPred)) { + grandParents.add(predPred); + } + } + } else { + grandParents.clear(); + break; + } + } + + if (!grandParents.isEmpty()) { + for (TezOperator pred : preds) { + if (grandParents.contains(pred)) { + grandParents.remove(pred); + } + } + } + } + return grandParents; + } } Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezOperator.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezOperator.java?rev=1733627&r1=1733626&r2=1733627&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezOperator.java (original) +++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezOperator.java Fri Mar 4 18:17:39 2016 @@ -18,11 +18,16 @@ package org.apache.pig.backend.hadoop.executionengine.tez.plan; import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.Serializable; import java.util.ArrayList; import java.util.BitSet; import java.util.HashMap; +import java.util.Iterator; +import java.util.LinkedList; import java.util.List; import java.util.Map; +import java.util.Map.Entry; import java.util.Set; import java.util.concurrent.atomic.AtomicInteger; @@ -30,6 +35,8 @@ import org.apache.pig.backend.hadoop.exe import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLoad; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore; +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.util.PlanHelper; +import org.apache.pig.backend.hadoop.executionengine.tez.plan.optimizer.TezOperDependencyParallelismEstimator.TezParallelismFactorVisitor; import org.apache.pig.backend.hadoop.executionengine.tez.runtime.PigProcessor; import org.apache.pig.impl.io.FileSpec; import org.apache.pig.impl.plan.Operator; @@ -50,7 +57,11 @@ public class TezOperator extends Operato private static final long serialVersionUID = 1L; // Processor pipeline - public PhysicalPlan plan; + // Note TezOperator needs to be serialized and de-serialized to + // be used in PigGraceShuffleVertexManager, some fields are either + // big, or not serializable, and not in use in PigGraceShuffleVertexManager, + // mark them as transient: plan, vertexGroupInfo, inputSplitInfo + public transient PhysicalPlan plan; // Descriptors for out-bound edges. public Map<OperatorKey, TezEdgeDescriptor> outEdges; @@ -133,6 +144,12 @@ public class TezOperator extends Operato private boolean useMRMapSettings = false; + private boolean useGraceParallelism = false; + + private Map<OperatorKey, Double> parallelismFactorPerSuccessor; + + private Boolean intermediateReducer = null; + // Types of blocking operators. For now, we only support the following ones. public static enum OPER_FEATURE { // Indicate if this job is a merge indexer @@ -159,8 +176,12 @@ public class TezOperator extends Operato LIMIT_AFTER_SORT, // Indicate if this job is a union job UNION, + // Indicate if this job is a distinct job + DISTINCT, // Indicate if this job is a native job - NATIVE; + NATIVE, + // Indicate if this job does rank counter + RANK_COUNTER; }; // Features in the job/vertex. Mostly will be only one feature. @@ -170,16 +191,17 @@ public class TezOperator extends Operato private List<OperatorKey> vertexGroupMembers; // For union - private VertexGroupInfo vertexGroupInfo; + private transient VertexGroupInfo vertexGroupInfo; // Mapping of OperatorKey of POStore OperatorKey to vertexGroup TezOperator private Map<OperatorKey, OperatorKey> vertexGroupStores = null; + private boolean isVertexGroup = false; - public static class LoaderInfo { + public static class LoaderInfo implements Serializable { private List<POLoad> loads = null; private ArrayList<FileSpec> inp = new ArrayList<FileSpec>(); private ArrayList<String> inpSignatureLists = new ArrayList<String>(); private ArrayList<Long> inpLimits = new ArrayList<Long>(); - private InputSplitInfo inputSplitInfo = null; + private transient InputSplitInfo inputSplitInfo = null; public List<POLoad> getLoads() { return loads; } @@ -262,9 +284,10 @@ public class TezOperator extends Operato this.estimatedParallelism = estimatedParallelism; } - public int getEffectiveParallelism() { + public int getEffectiveParallelism(int defaultParallelism) { // PIG-4162: For intermediate reducers, use estimated parallelism over user set parallelism. - return getEstimatedParallelism() == -1 ? getRequestedParallelism() + return getEstimatedParallelism() == -1 + ? (getRequestedParallelism() == -1 ? defaultParallelism : getRequestedParallelism()) : getEstimatedParallelism(); } @@ -405,6 +428,14 @@ public class TezOperator extends Operato feature.set(OPER_FEATURE.UNION.ordinal()); } + public boolean isDistinct() { + return feature.get(OPER_FEATURE.DISTINCT.ordinal()); + } + + public void markDistinct() { + feature.set(OPER_FEATURE.DISTINCT.ordinal()); + } + public boolean isNative() { return feature.get(OPER_FEATURE.NATIVE.ordinal()); } @@ -413,6 +444,14 @@ public class TezOperator extends Operato feature.set(OPER_FEATURE.NATIVE.ordinal()); } + public boolean isRankCounter() { + return feature.get(OPER_FEATURE.RANK_COUNTER.ordinal()); + } + + public void markRankCounter() { + feature.set(OPER_FEATURE.RANK_COUNTER.ordinal()); + } + public void copyFeatures(TezOperator copyFrom, List<OPER_FEATURE> excludeFeatures) { for (OPER_FEATURE opf : OPER_FEATURE.values()) { if (excludeFeatures != null && excludeFeatures.contains(opf)) { @@ -440,7 +479,7 @@ public class TezOperator extends Operato this.useSecondaryKey = useSecondaryKey; } - public List<OperatorKey> getUnionPredecessors() { + public List<OperatorKey> getUnionMembers() { return vertexGroupMembers; } @@ -462,7 +501,7 @@ public class TezOperator extends Operato // Union is the only operator that uses alias vertex (VertexGroup) now. But // more operators could be added to the list in the future. public boolean isVertexGroup() { - return vertexGroupInfo != null; + return isVertexGroup; } public VertexGroupInfo getVertexGroupInfo() { @@ -471,6 +510,7 @@ public class TezOperator extends Operato public void setVertexGroupInfo(VertexGroupInfo vertexGroup) { this.vertexGroupInfo = vertexGroup; + this.isVertexGroup = true; } public void addVertexGroupStore(OperatorKey storeKey, OperatorKey vertexGroupKey) { @@ -480,6 +520,16 @@ public class TezOperator extends Operato this.vertexGroupStores.put(storeKey, vertexGroupKey); } + public void removeVertexGroupStore(OperatorKey vertexGroupKey) { + Iterator<Entry<OperatorKey, OperatorKey>> iter = vertexGroupStores.entrySet().iterator(); + while (iter.hasNext()) { + Entry<OperatorKey, OperatorKey> entry = iter.next(); + if (entry.getValue().equals(vertexGroupKey)) { + iter.remove(); + } + } + } + public Map<OperatorKey, OperatorKey> getVertexGroupStores() { return this.vertexGroupStores; } @@ -496,7 +546,7 @@ public class TezOperator extends Operato public String toString() { StringBuilder sb = new StringBuilder(name() + ":\n"); ByteArrayOutputStream baos = new ByteArrayOutputStream(); - if (!plan.isEmpty()) { + if (plan!=null && !plan.isEmpty()) { plan.explain(baos); String mp = new String(baos.toByteArray()); sb.append(shiftStringByTabs(mp, "| ")); @@ -601,6 +651,52 @@ public class TezOperator extends Operato return loaderInfo; } + public void setUseGraceParallelism(boolean useGraceParallelism) { + this.useGraceParallelism = useGraceParallelism; + } + public boolean isUseGraceParallelism() { + return useGraceParallelism; + } + + public double getParallelismFactor(TezOperator successor) throws VisitorException { + if (parallelismFactorPerSuccessor == null) { + parallelismFactorPerSuccessor = new HashMap<OperatorKey, Double>(); + } + Double factor = parallelismFactorPerSuccessor.get(successor.getOperatorKey()); + if (factor == null) { + // We determine different parallelism factors for different successors (edges). + // For eg: If we have two successors, one with combine plan and other without + // we want to compute lesser parallelism factor for the one with the combine plan + // as that edge will get less data. + // TODO: To be more perfect, we need only look at the split sub-plan that + // writes to that successor edge. If there is a FILTER in one sub-plan it is accounted + // for all the successors now which is not right. + TezParallelismFactorVisitor parallelismFactorVisitor = new TezParallelismFactorVisitor(this, successor); + parallelismFactorVisitor.visit(); + factor = parallelismFactorVisitor.getFactor(); + parallelismFactorPerSuccessor.put(successor.getOperatorKey(), factor); + } + return factor; + } + + public Boolean isIntermediateReducer() throws IOException { + if (intermediateReducer == null) { + intermediateReducer = false; + // set intermediateReducer to true if are no loads or stores in a TezOperator + LinkedList<POStore> stores = PlanHelper.getPhysicalOperators(plan, POStore.class); + // Not map and not final reducer + if (stores.size() <= 0 && + (getLoaderInfo().getLoads() == null || getLoaderInfo().getLoads().size() <= 0)) { + intermediateReducer = true; + } + } + return intermediateReducer; + } + + public void setIntermediateReducer(Boolean intermediateReducer) { + this.intermediateReducer = intermediateReducer; + } + public static class VertexGroupInfo { private List<OperatorKey> inputKeys; Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezPOPackageAnnotator.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezPOPackageAnnotator.java?rev=1733627&r1=1733626&r2=1733627&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezPOPackageAnnotator.java (original) +++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezPOPackageAnnotator.java Fri Mar 4 18:17:39 2016 @@ -30,6 +30,7 @@ import org.apache.pig.backend.hadoop.exe import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.LitePackager; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLocalRearrange; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPackage; +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSplit; import org.apache.pig.backend.hadoop.executionengine.tez.plan.operator.POLocalRearrangeTez; import org.apache.pig.impl.plan.DepthFirstWalker; import org.apache.pig.impl.plan.VisitorException; @@ -67,11 +68,13 @@ public class TezPOPackageAnnotator exten List<TezOperator> preds = this.mPlan.getPredecessors(pkgTezOp); for (Iterator<TezOperator> it = preds.iterator(); it.hasNext();) { TezOperator predTezOp = it.next(); + TezOperator predTezOpVertexGrp = null; if (predTezOp.isVertexGroup()) { + predTezOpVertexGrp = predTezOp; // Just get one of the inputs to vertex group predTezOp = getPlan().getOperator(predTezOp.getVertexGroupMembers().get(0)); } - lrFound += patchPackage(predTezOp, pkgTezOp, pkg); + lrFound += patchPackage(predTezOp, predTezOpVertexGrp, pkgTezOp, pkg); if(lrFound == pkg.getNumInps()) { break; } @@ -79,13 +82,19 @@ public class TezPOPackageAnnotator exten if(lrFound != pkg.getNumInps()) { int errCode = 2086; - String msg = "Unexpected problem during optimization. Could not find all LocalRearrange operators."; + String msg = "Unexpected problem during optimization. " + + "Could not find all LocalRearrange operators. Expected: " + + pkg.getNumInps() + ", Found: " + lrFound; throw new OptimizerException(msg, errCode, PigException.BUG); } } - private int patchPackage(TezOperator predTezOp, TezOperator pkgTezOp, POPackage pkg) throws VisitorException { - LoRearrangeDiscoverer lrDiscoverer = new LoRearrangeDiscoverer(predTezOp.plan, pkgTezOp, pkg); + private int patchPackage(TezOperator predTezOp, + TezOperator predTezOpVertexGrp, + TezOperator pkgTezOp, + POPackage pkg) throws VisitorException { + LoRearrangeDiscoverer lrDiscoverer = new LoRearrangeDiscoverer( + predTezOp.plan, predTezOpVertexGrp, pkgTezOp, pkg); lrDiscoverer.visit(); // let our caller know if we managed to patch // the package @@ -131,13 +140,24 @@ public class TezPOPackageAnnotator exten private int loRearrangeFound = 0; private TezOperator pkgTezOp; private POPackage pkg; + private TezOperator predTezOpVertexGrp; + private boolean isPOSplit; - public LoRearrangeDiscoverer(PhysicalPlan plan, TezOperator pkgTezOp, POPackage pkg) { - super(plan, new DepthFirstWalker<PhysicalOperator, PhysicalPlan>(plan)); + public LoRearrangeDiscoverer(PhysicalPlan predPlan, TezOperator predTezOpVertexGrp, TezOperator pkgTezOp, POPackage pkg) { + super(predPlan, new DepthFirstWalker<PhysicalOperator, PhysicalPlan>(predPlan)); this.pkgTezOp = pkgTezOp; this.pkg = pkg; + this.predTezOpVertexGrp = predTezOpVertexGrp; + } + + @Override + public void visitSplit(POSplit spl) throws VisitorException { + isPOSplit = true; + super.visitSplit(spl); } + + @Override public void visitLocalRearrange(POLocalRearrange lrearrange) throws VisitorException { POLocalRearrangeTez lr = (POLocalRearrangeTez) lrearrange; @@ -160,17 +180,24 @@ public class TezPOPackageAnnotator exten if(keyInfo == null) keyInfo = new HashMap<Integer, Pair<Boolean, Map<Integer, Integer>>>(); - if(keyInfo.get(Integer.valueOf(lrearrange.getIndex())) != null) { - // something is wrong - we should not be getting key info - // for the same index from two different Local Rearranges - int errCode = 2087; - String msg = "Unexpected problem during optimization." + - " Found index:" + lrearrange.getIndex() + - " in multiple LocalRearrange operators."; - throw new OptimizerException(msg, errCode, PigException.BUG); + Integer index = Integer.valueOf(lrearrange.getIndex()); + if(keyInfo.get(index) != null) { + if (isPOSplit) { + // Case of POSplit having more than one input in case of self join or union + loRearrangeFound--; + } else { + // something is wrong - we should not be getting key info + // for the same index from two different Local Rearranges + int errCode = 2087; + String msg = "Unexpected problem during optimization." + + " Found index:" + lrearrange.getIndex() + + " in multiple LocalRearrange operators."; + throw new OptimizerException(msg, errCode, PigException.BUG); + } } - keyInfo.put(Integer.valueOf(lrearrange.getIndex()), + + keyInfo.put(index, new Pair<Boolean, Map<Integer, Integer>>( lrearrange.isProjectStar(), lrearrange.getProjectedColsMap())); pkg.getPkgr().setKeyInfo(keyInfo); Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezPlanContainer.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezPlanContainer.java?rev=1733627&r1=1733626&r2=1733627&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezPlanContainer.java (original) +++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezPlanContainer.java Fri Mar 4 18:17:39 2016 @@ -31,6 +31,7 @@ import java.util.Set; import org.apache.hadoop.yarn.api.records.LocalResource; import org.apache.pig.backend.hadoop.executionengine.tez.TezResourceManager; import org.apache.pig.impl.PigContext; +import org.apache.pig.impl.plan.DependencyOrderWalker; import org.apache.pig.impl.plan.OperatorKey; import org.apache.pig.impl.plan.OperatorPlan; import org.apache.pig.impl.plan.PlanException; @@ -152,38 +153,109 @@ public class TezPlanContainer extends Op } public void split(TezPlanContainerNode planNode) throws PlanException { + TezOperPlan tezOperPlan = planNode.getTezOperPlan(); + if (tezOperPlan.size() == 1) { + // Nothing to split. Only one operator in the plan + return; + } + TezOperator operToSegment = null; List<TezOperator> succs = new ArrayList<TezOperator>(); - for (TezOperator tezOper : tezOperPlan) { - if (tezOper.needSegmentBelow() && tezOperPlan.getSuccessors(tezOper)!=null) { - operToSegment = tezOper; - succs.addAll(tezOperPlan.getSuccessors(tezOper)); - break; - } + try { + // Split top down from root to leaves + SegmentOperatorFinder finder = new SegmentOperatorFinder(tezOperPlan); + finder.visit(); + operToSegment = finder.getOperatorToSegment(); + } catch (VisitorException e) { + throw new PlanException(e); } - if (operToSegment != null) { + + if (operToSegment != null && tezOperPlan.getSuccessors(operToSegment) != null) { + succs.addAll(tezOperPlan.getSuccessors(operToSegment)); for (TezOperator succ : succs) { tezOperPlan.disconnect(operToSegment, succ); - TezOperPlan newOperPlan = new TezOperPlan(); - List<TezPlanContainerNode> containerSuccs = new ArrayList<TezPlanContainerNode>(); - if (getSuccessors(planNode)!=null) { - containerSuccs.addAll(getSuccessors(planNode)); - } - tezOperPlan.moveTree(succ, newOperPlan); - TezPlanContainerNode newPlanNode = new TezPlanContainerNode(generateNodeOperatorKey(), newOperPlan); - add(newPlanNode); - for (TezPlanContainerNode containerNodeSucc : containerSuccs) { - disconnect(planNode, containerNodeSucc); - connect(newPlanNode, containerNodeSucc); + } + for (TezOperator succ : succs) { + try { + if (tezOperPlan.getOperator(succ.getOperatorKey()) == null) { + // Has already been moved to a new plan by previous successor + // as part of dependency. It could have been further split. + // So walk the full plan to find the new plan and connect + TezOperatorFinder finder = new TezOperatorFinder(this, succ); + finder.visit(); + connect(planNode, finder.getPlanContainerNode()); + continue; + } + TezOperPlan newOperPlan = new TezOperPlan(); + tezOperPlan.moveTree(succ, newOperPlan); + TezPlanContainerNode newPlanNode = new TezPlanContainerNode( + generateNodeOperatorKey(), newOperPlan); + add(newPlanNode); + connect(planNode, newPlanNode); + split(newPlanNode); + if (newPlanNode.getTezOperPlan().getOperator(succ.getOperatorKey()) == null) { + // On further split, the successor moved to a new plan container. + // Connect to that + TezOperatorFinder finder = new TezOperatorFinder(this, succ); + finder.visit(); + disconnect(planNode, newPlanNode); + connect(planNode, finder.getPlanContainerNode()); + } + } catch (VisitorException e) { + throw new PlanException(e); } - connect(planNode, newPlanNode); - split(newPlanNode); } split(planNode); } } + private static class SegmentOperatorFinder extends TezOpPlanVisitor { + + private TezOperator operToSegment; + + public SegmentOperatorFinder(TezOperPlan plan) { + super(plan, new DependencyOrderWalker<TezOperator, TezOperPlan>(plan)); + } + + public TezOperator getOperatorToSegment() { + return operToSegment; + } + + @Override + public void visitTezOp(TezOperator tezOperator) throws VisitorException { + if (tezOperator.needSegmentBelow() && operToSegment == null) { + operToSegment = tezOperator; + } + } + + } + + private static class TezOperatorFinder extends TezPlanContainerVisitor { + + private TezPlanContainerNode planContainerNode; + private TezOperator operatorToFind; + + public TezOperatorFinder(TezPlanContainer plan, TezOperator operatorToFind) { + super(plan, new DependencyOrderWalker<TezPlanContainerNode, TezPlanContainer>(plan)); + this.operatorToFind = operatorToFind; + } + + public TezPlanContainerNode getPlanContainerNode() { + return planContainerNode; + } + + @Override + public void visitTezPlanContainerNode( + TezPlanContainerNode tezPlanContainerNode) + throws VisitorException { + if (tezPlanContainerNode.getTezOperPlan().getOperatorKey(operatorToFind) != null) { + planContainerNode = tezPlanContainerNode; + } + } + + } + private synchronized OperatorKey generateNodeOperatorKey() { OperatorKey opKey = new OperatorKey(jobName + "-" + dagId + "_scope", scopeId); scopeId++; Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezPlanContainerPrinter.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezPlanContainerPrinter.java?rev=1733627&r1=1733626&r2=1733627&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezPlanContainerPrinter.java (original) +++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezPlanContainerPrinter.java Fri Mar 4 18:17:39 2016 @@ -19,7 +19,8 @@ package org.apache.pig.backend.hadoop.ex import java.io.PrintStream; -import org.apache.pig.backend.hadoop.executionengine.tez.plan.TezPrinter.TezGraphPrinter; +import org.apache.pig.backend.hadoop.executionengine.tez.plan.TezPrinter.TezDAGGraphPrinter; +import org.apache.pig.backend.hadoop.executionengine.tez.plan.TezPrinter.TezVertexGraphPrinter; import org.apache.pig.impl.plan.DependencyOrderWalker; import org.apache.pig.impl.plan.VisitorException; @@ -37,6 +38,19 @@ public class TezPlanContainerPrinter ext mStream.println("#--------------------------------------------------"); mStream.println("# There are " + planContainer.size() + " DAGs in the session"); mStream.println("#--------------------------------------------------"); + printContainerPlan(planContainer); + } + + private void printContainerPlan(TezPlanContainer planContainer) { + try { + if (planContainer.size() > 1) { + TezDAGGraphPrinter graphPrinter = new TezDAGGraphPrinter(planContainer); + graphPrinter.visit(); + mStream.print(graphPrinter.toString()); + } + } catch (VisitorException e) { + throw new RuntimeException(e); + } } public void setVerbose(boolean verbose) { @@ -48,7 +62,7 @@ public class TezPlanContainerPrinter ext mStream.println("#--------------------------------------------------"); mStream.println("# TEZ DAG plan: " + tezPlanContainerNode.getOperatorKey()); mStream.println("#--------------------------------------------------"); - TezGraphPrinter graphPrinter = new TezGraphPrinter(tezPlanContainerNode.getTezOperPlan()); + TezVertexGraphPrinter graphPrinter = new TezVertexGraphPrinter(tezPlanContainerNode.getTezOperPlan()); graphPrinter.visit(); mStream.print(graphPrinter.toString()); TezPrinter printer = new TezPrinter(mStream, tezPlanContainerNode.getTezOperPlan()); Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezPrinter.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezPrinter.java?rev=1733627&r1=1733626&r2=1733627&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezPrinter.java (original) +++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezPrinter.java Fri Mar 4 18:17:39 2016 @@ -55,9 +55,13 @@ public class TezPrinter extends TezOpPla public void visitTezOp(TezOperator tezOper) throws VisitorException { if (tezOper.isVertexGroup()) { VertexGroupInfo info = tezOper.getVertexGroupInfo(); - mStream.println("Tez vertex group " - + tezOper.getOperatorKey().toString() + "\t<-\t " - + info.getInputs() + "\t->\t " + info.getOutput()); + mStream.print("Tez vertex group " + + tezOper.getOperatorKey().toString()); + if (info!=null) { + mStream.println("\t<-\t " + info.getInputs() + "\t->\t " + info.getOutput()); + } else { + mStream.println(); + } mStream.println("# No plan on vertex group"); } else { mStream.println("Tez vertex " + tezOper.getOperatorKey().toString()); @@ -86,29 +90,36 @@ public class TezPrinter extends TezOpPla printer.setVerbose(isVerbose); printer.visit(); mStream.println(); + } else if (!tezOper.isVertexGroup()) { + // For things like NativeTezOper + mStream.println("" + tezOper); } } /** * This class prints the Tez Vertex Graph */ - public static class TezGraphPrinter extends TezOpPlanVisitor { + public static class TezVertexGraphPrinter extends TezOpPlanVisitor { - StringBuffer buf; + StringBuilder buf; - public TezGraphPrinter(TezOperPlan plan) { + public TezVertexGraphPrinter(TezOperPlan plan) { super(plan, new DependencyOrderWalker<TezOperator, TezOperPlan>(plan, true)); - buf = new StringBuffer(); + buf = new StringBuilder(); } @Override public void visitTezOp(TezOperator tezOper) throws VisitorException { + writePlan(mPlan, tezOper, buf); + } + + public static void writePlan(TezOperPlan plan, TezOperator tezOper, StringBuilder buf) { if (tezOper.isVertexGroup()) { buf.append("Tez vertex group " + tezOper.getOperatorKey().toString()); } else { buf.append("Tez vertex " + tezOper.getOperatorKey().toString()); } - List<TezOperator> succs = mPlan.getSuccessors(tezOper); + List<TezOperator> succs = plan.getSuccessors(tezOper); if (succs != null) { Collections.sort(succs); buf.append("\t->\t"); @@ -121,6 +132,43 @@ public class TezPrinter extends TezOpPla } } buf.append("\n"); + } + + @Override + public String toString() { + buf.append("\n"); + return buf.toString(); + } + } + + /** + * This class prints the Tez DAG Graph + */ + public static class TezDAGGraphPrinter extends TezPlanContainerVisitor { + + StringBuilder buf; + + public TezDAGGraphPrinter(TezPlanContainer plan) { + super(plan, new DependencyOrderWalker<TezPlanContainerNode, TezPlanContainer>(plan, true)); + buf = new StringBuilder(); + } + + @Override + public void visitTezPlanContainerNode(TezPlanContainerNode tezPlanContainerNode) throws VisitorException { + writePlan(mPlan, tezPlanContainerNode, buf); + } + + public static void writePlan(TezPlanContainer mPlan, TezPlanContainerNode tezPlanContainerNode, StringBuilder buf) { + buf.append("Tez DAG " + tezPlanContainerNode.getOperatorKey().toString()); + List<TezPlanContainerNode> succs = mPlan.getSuccessors(tezPlanContainerNode); + if (succs != null) { + Collections.sort(succs); + buf.append("\t->\t"); + for (TezPlanContainerNode op : succs) { + buf.append("Tez DAG " + op.getOperatorKey().toString()).append(","); + } + } + buf.append("\n"); } @Override Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POCounterStatsTez.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POCounterStatsTez.java?rev=1733627&r1=1733626&r2=1733627&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POCounterStatsTez.java (original) +++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POCounterStatsTez.java Fri Mar 4 18:17:39 2016 @@ -35,7 +35,6 @@ import org.apache.pig.backend.hadoop.exe import org.apache.pig.backend.hadoop.executionengine.tez.runtime.TezInput; import org.apache.pig.backend.hadoop.executionengine.tez.runtime.TezOutput; import org.apache.pig.data.Tuple; -import org.apache.pig.data.TupleFactory; import org.apache.pig.impl.plan.OperatorKey; import org.apache.pig.impl.plan.VisitorException; import org.apache.tez.runtime.api.LogicalInput; @@ -56,6 +55,7 @@ public class POCounterStatsTez extends P // KeyValuesReader. After TEZ-661, switch to unsorted shuffle private transient KeyValuesReader reader; private transient KeyValueWriter writer; + private transient boolean finished = false; public POCounterStatsTez(OperatorKey k) { super(k); @@ -123,6 +123,9 @@ public class POCounterStatsTez extends P @Override public Result getNextTuple() throws ExecException { try { + if (finished) { + return RESULT_EOP; + } Map<Integer, Long> counterRecords = new HashMap<Integer, Long>(); Integer key = null; Long value = null; @@ -148,9 +151,10 @@ public class POCounterStatsTez extends P prevTasksCount += counterRecords.get(i); } - Tuple tuple = TupleFactory.getInstance().newTuple(1); + Tuple tuple = mTupleFactory.newTuple(1); tuple.set(0, counterOffsets); writer.write(POValueOutputTez.EMPTY_KEY, tuple); + finished = true; return RESULT_EOP; } catch (IOException e) { throw new ExecException(e); Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POFRJoinTez.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POFRJoinTez.java?rev=1733627&r1=1733626&r2=1733627&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POFRJoinTez.java (original) +++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POFRJoinTez.java Fri Mar 4 18:17:39 2016 @@ -53,11 +53,13 @@ public class POFRJoinTez extends POFRJoi private static final Log log = LogFactory.getLog(POFRJoinTez.class); private static final long serialVersionUID = 1L; - // For replicated tables - private List<LogicalInput> replInputs = Lists.newArrayList(); - private List<KeyValueReader> replReaders = Lists.newArrayList(); private List<String> inputKeys; + + // For replicated tables + private transient List<LogicalInput> replInputs; + private transient List<KeyValueReader> replReaders; private transient boolean isInputCached; + private transient String cacheKey; public POFRJoinTez(POFRJoin copy, List<String> inputKeys) throws ExecException { super(copy); @@ -71,14 +73,14 @@ public class POFRJoinTez extends POFRJoi @Override public void replaceInput(String oldInputKey, String newInputKey) { - if (inputKeys.remove(oldInputKey)) { + while (inputKeys.remove(oldInputKey)) { inputKeys.add(newInputKey); } } @Override public void addInputsToSkip(Set<String> inputsToSkip) { - String cacheKey = "replicatemap-" + getOperatorKey().toString(); + cacheKey = "replicatemap-" + inputKeys.toString(); Object cacheValue = ObjectCache.getInstance().retrieve(cacheKey); if (cacheValue != null) { isInputCached = true; @@ -93,10 +95,14 @@ public class POFRJoinTez extends POFRJoi return; } try { + this.replInputs = Lists.newArrayList(); + this.replReaders = Lists.newArrayList(); for (String key : inputKeys) { LogicalInput input = inputs.get(key); - this.replInputs.add(input); - this.replReaders.add((KeyValueReader) input.getReader()); + if (!this.replInputs.contains(input)) { + this.replInputs.add(input); + this.replReaders.add((KeyValueReader) input.getReader()); + } } } catch (Exception e) { throw new ExecException(e); @@ -110,10 +116,11 @@ public class POFRJoinTez extends POFRJoi */ @Override protected void setUpHashMap() throws ExecException { - String cacheKey = "replicatemap-" + getOperatorKey().toString(); - if (isInputCached) { - Object cacheValue = ObjectCache.getInstance().retrieve(cacheKey); + // Re-check again in case of Split + union + replicate join + // where same POFRJoinTez occurs in different Split sub-plans + Object cacheValue = ObjectCache.getInstance().retrieve(cacheKey); + if (cacheValue != null) { replicates = (TupleToMapKey[]) cacheValue; log.info("Found " + (replicates.length - 1) + " replication hash tables in Tez cache. cachekey=" + cacheKey); return; @@ -208,4 +215,10 @@ public class POFRJoinTez extends POFRJoi public List<String> getInputKeys() { return inputKeys; } + + @Override + public POFRJoinTez clone() throws CloneNotSupportedException { + return (POFRJoinTez) super.clone(); + } + } Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POIdentityInOutTez.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POIdentityInOutTez.java?rev=1733627&r1=1733626&r2=1733627&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POIdentityInOutTez.java (original) +++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POIdentityInOutTez.java Fri Mar 4 18:17:39 2016 @@ -56,10 +56,12 @@ public class POIdentityInOutTez extends private transient KeyValueReader reader; private transient KeyValuesReader shuffleReader; private transient boolean shuffleInput; + private transient boolean finished = false; - public POIdentityInOutTez(OperatorKey k, POLocalRearrange inputRearrange) { + public POIdentityInOutTez(OperatorKey k, POLocalRearrange inputRearrange, String inputKey) { super(inputRearrange); this.mKey = k; + this.inputKey = inputKey; } public void setInputKey(String inputKey) { @@ -121,6 +123,9 @@ public class POIdentityInOutTez extends @Override public Result getNextTuple() throws ExecException { try { + if (finished) { + return RESULT_EOP; + } if (shuffleInput) { while (shuffleReader.next()) { Object curKey = shuffleReader.getCurrentKey(); @@ -152,6 +157,7 @@ public class POIdentityInOutTez extends } } } + finished = true; return RESULT_EOP; } catch (IOException e) { throw new ExecException(e); Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POLocalRearrangeTez.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POLocalRearrangeTez.java?rev=1733627&r1=1733626&r2=1733627&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POLocalRearrangeTez.java (original) +++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POLocalRearrangeTez.java Fri Mar 4 18:17:39 2016 @@ -31,9 +31,9 @@ import org.apache.pig.backend.hadoop.exe import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLocalRearrange; import org.apache.pig.backend.hadoop.executionengine.tez.runtime.TezOutput; import org.apache.pig.data.Tuple; +import org.apache.pig.impl.io.NullablePartitionWritable; import org.apache.pig.impl.io.NullableTuple; import org.apache.pig.impl.io.PigNullableWritable; -import org.apache.pig.impl.plan.NodeIdGenerator; import org.apache.pig.impl.plan.OperatorKey; import org.apache.tez.runtime.api.LogicalOutput; import org.apache.tez.runtime.library.api.KeyValueWriter; @@ -48,11 +48,11 @@ public class POLocalRearrangeTez extends private static final Log LOG = LogFactory.getLog(POLocalRearrangeTez.class); protected String outputKey; - protected transient KeyValueWriter writer; - protected boolean connectedToPackage = true; protected boolean isSkewedJoin = false; + protected transient KeyValueWriter writer; + public POLocalRearrangeTez(OperatorKey k) { super(k); } @@ -144,6 +144,13 @@ public class POLocalRearrangeTez extends // assign the tuple to its slot in the projection. key.setIndex(index); val.setIndex(index); + if (isSkewedJoin) { + // Wrap into a NullablePartitionWritable to match the key + // of the right table from POPartitionRearrangeTez for the skewed join + NullablePartitionWritable wrappedKey = new NullablePartitionWritable(key); + wrappedKey.setPartition(-1); + key = wrappedKey; + } writer.write(key, val); } else { illustratorMarkup(res.result, res.result, 0); @@ -164,20 +171,9 @@ public class POLocalRearrangeTez extends return inp; } - /** - * Make a deep copy of this operator. - * @throws CloneNotSupportedException - */ @Override public POLocalRearrangeTez clone() throws CloneNotSupportedException { - POLocalRearrangeTez clone = new POLocalRearrangeTez(new OperatorKey( - mKey.scope, NodeIdGenerator.getGenerator().getNextNodeId( - mKey.scope)), requestedParallelism); - deepCopyTo(clone); - clone.isSkewedJoin = isSkewedJoin; - clone.connectedToPackage = connectedToPackage; - clone.setOutputKey(outputKey); - return clone; + return (POLocalRearrangeTez) super.clone(); } @Override
