Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezCompiler.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezCompiler.java?rev=1784224&r1=1784223&r2=1784224&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezCompiler.java (original) +++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezCompiler.java Fri Feb 24 03:34:37 2017 @@ -32,12 +32,10 @@ import java.util.Stack; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.mapreduce.lib.partition.HashPartitioner; import org.apache.hadoop.util.StringUtils; -import org.apache.hadoop.util.hash.Hash; import org.apache.pig.CollectableLoadFunc; import org.apache.pig.FuncSpec; import org.apache.pig.IndexableLoadFunc; @@ -46,10 +44,8 @@ import org.apache.pig.OrderedLoadFunc; import org.apache.pig.PigConfiguration; import org.apache.pig.PigException; import org.apache.pig.backend.executionengine.ExecException; -import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil; import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.JobControlCompiler.PigTupleWritableComparator; import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MergeJoinIndexer; -import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigWritableComparators; import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.ScalarPhyFinder; import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.UDFFinder; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus; @@ -86,10 +82,7 @@ import org.apache.pig.backend.hadoop.exe import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POUnion; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.Packager.PackageType; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.util.PlanHelper; -import org.apache.pig.backend.hadoop.executionengine.tez.plan.operator.BloomPackager; import org.apache.pig.backend.hadoop.executionengine.tez.plan.operator.NativeTezOper; -import org.apache.pig.backend.hadoop.executionengine.tez.plan.operator.POBloomFilterRearrangeTez; -import org.apache.pig.backend.hadoop.executionengine.tez.plan.operator.POBuildBloomRearrangeTez; import org.apache.pig.backend.hadoop.executionengine.tez.plan.operator.POCounterStatsTez; import org.apache.pig.backend.hadoop.executionengine.tez.plan.operator.POCounterTez; import org.apache.pig.backend.hadoop.executionengine.tez.plan.operator.POFRJoinTez; @@ -117,7 +110,6 @@ import org.apache.pig.impl.builtin.GetMe import org.apache.pig.impl.builtin.PartitionSkewedKeys; import org.apache.pig.impl.io.FileLocalizer; import org.apache.pig.impl.io.FileSpec; -import org.apache.pig.impl.io.NullableIntWritable; import org.apache.pig.impl.plan.DepthFirstWalker; import org.apache.pig.impl.plan.NodeIdGenerator; import org.apache.pig.impl.plan.Operator; @@ -175,10 +167,6 @@ public class TezCompiler extends PhyPlan private Map<PhysicalOperator, TezOperator> phyToTezOpMap; - // Contains the inputs to operator like join, with the list maintaining the - // same order of join from left to right - private Map<TezOperator, List<TezOperator>> inputsMap; - public static final String USER_COMPARATOR_MARKER = "user.comparator.func:"; public static final String FILE_CONCATENATION_THRESHOLD = "pig.files.concatenation.threshold"; public static final String OPTIMISTIC_FILE_CONCATENATION = "pig.optimistic.files.concatenation"; @@ -187,8 +175,6 @@ public class TezCompiler extends PhyPlan private boolean optimisticFileConcatenation = false; private List<String> readOnceLoadFuncs = null; - private Configuration conf; - private POLocalRearrangeTezFactory localRearrangeFactory; public TezCompiler(PhysicalPlan plan, PigContext pigContext) @@ -198,7 +184,6 @@ public class TezCompiler extends PhyPlan this.pigContext = pigContext; pigProperties = pigContext.getProperties(); - conf = ConfigurationUtil.toConfiguration(pigProperties, false); splitsSeen = Maps.newHashMap(); tezPlan = new TezOperPlan(); nig = NodeIdGenerator.getGenerator(); @@ -212,7 +197,6 @@ public class TezCompiler extends PhyPlan scope = roots.get(0).getOperatorKey().getScope(); localRearrangeFactory = new POLocalRearrangeTezFactory(scope, nig); phyToTezOpMap = Maps.newHashMap(); - inputsMap = Maps.newHashMap(); fileConcatenationThreshold = Integer.parseInt(pigProperties .getProperty(FILE_CONCATENATION_THRESHOLD, "100")); @@ -671,8 +655,15 @@ public class TezCompiler extends PhyPlan blocking(); TezCompilerUtil.setCustomPartitioner(op.getCustomPartitioner(), curTezOp); - TezEdgeDescriptor edge = curTezOp.inEdges.get(lastOp.getOperatorKey()); - edge.setNeedsDistinctCombiner(true); + // Add the DISTINCT plan as the combine plan. In MR Pig, the combiner is implemented + // with a global variable and a specific DistinctCombiner class. This seems better. + PhysicalPlan combinePlan = curTezOp.inEdges.get(lastOp.getOperatorKey()).combinePlan; + addDistinctPlan(combinePlan, 1); + + POLocalRearrangeTez clr = localRearrangeFactory.create(); + clr.setOutputKey(curTezOp.getOperatorKey().toString()); + clr.setDistinct(true); + combinePlan.addAsLeaf(clr); curTezOp.markDistinct(); addDistinctPlan(curTezOp.plan, op.getRequestedParallelism()); @@ -865,7 +856,6 @@ public class TezCompiler extends PhyPlan } else { curTezOp.plan.addAsLeaf(op); } - phyToTezOpMap.put(op, curTezOp); } catch (Exception e) { int errCode = 2034; @@ -910,7 +900,6 @@ public class TezCompiler extends PhyPlan public void visitGlobalRearrange(POGlobalRearrange op) throws VisitorException { try { blocking(); - inputsMap.put(curTezOp, new ArrayList<>(Arrays.asList(compiledInputs))); TezCompilerUtil.setCustomPartitioner(op.getCustomPartitioner(), curTezOp); curTezOp.setRequestedParallelism(op.getRequestedParallelism()); if (op.isCross()) { @@ -1099,7 +1088,7 @@ public class TezCompiler extends PhyPlan indexerTezOp.setDontEstimateParallelism(true); POStore st = TezCompilerUtil.getStore(scope, nig); - FileSpec strFile = getTempFileSpec(pigContext); + FileSpec strFile = getTempFileSpec(); st.setSFile(strFile); indexAggrOper.plan.addAsLeaf(st); indexAggrOper.setClosed(true); @@ -1266,7 +1255,7 @@ public class TezCompiler extends PhyPlan rightTezOprAggr.setDontEstimateParallelism(true); POStore st = TezCompilerUtil.getStore(scope, nig); - FileSpec strFile = getTempFileSpec(pigContext); + FileSpec strFile = getTempFileSpec(); st.setSFile(strFile); rightTezOprAggr.plan.addAsLeaf(st); rightTezOprAggr.setClosed(true); @@ -1357,9 +1346,6 @@ public class TezCompiler extends PhyPlan } else if (op.getNumInps() > 1) { curTezOp.markCogroup(); } - } else if (op.getPkgr().getPackageType() == PackageType.BLOOMJOIN) { - curTezOp.markRegularJoin(); - addBloomToJoin(op, curTezOp); } } catch (Exception e) { int errCode = 2034; @@ -1368,132 +1354,6 @@ public class TezCompiler extends PhyPlan } } - private void addBloomToJoin(POPackage op, TezOperator curTezOp) throws PlanException { - - List<TezOperator> inputs = inputsMap.get(curTezOp); - TezOperator buildBloomOp; - List<TezOperator> applyBloomOps = new ArrayList<>(); - - String strategy = conf.get(PigConfiguration.PIG_BLOOMJOIN_STRATEGY, POBuildBloomRearrangeTez.DEFAULT_BLOOM_STRATEGY); - boolean createBloomInMap = "map".equals(strategy); - if (!createBloomInMap && !strategy.equals("reduce")) { - throw new PlanException(new IllegalArgumentException( - "Invalid value for " - + PigConfiguration.PIG_BLOOMJOIN_STRATEGY + " - " - + strategy + ". Valid values are map and reduce")); - } - int numHash = conf.getInt(PigConfiguration.PIG_BLOOMJOIN_HASH_FUNCTIONS, POBuildBloomRearrangeTez.DEFAULT_NUM_BLOOM_HASH_FUNCTIONS); - int vectorSizeBytes = conf.getInt(PigConfiguration.PIG_BLOOMJOIN_VECTORSIZE_BYTES, POBuildBloomRearrangeTez.DEFAULT_BLOOM_VECTOR_SIZE_BYTES); - int numBloomFilters = POBuildBloomRearrangeTez.getNumBloomFilters(conf); - int hashType = Hash.parseHashType(conf.get(PigConfiguration.PIG_BLOOMJOIN_HASH_TYPE, POBuildBloomRearrangeTez.DEFAULT_BLOOM_HASH_TYPE)); - - // We build bloom of the right most input and apply the bloom filter on the left inputs by default. - // But in case of left outer join we build bloom of the left input and use it on the right input - boolean[] inner = op.getPkgr().getInner(); - boolean skipNullKeys = true; - if (inner[inner.length - 1]) { // inner has from right to left while inputs has from left to right - buildBloomOp = inputs.get(inputs.size() - 1); // Bloom filter is built from right most input - for (int i = 0; i < (inner.length - 1); i++) { - applyBloomOps.add(inputs.get(i)); - } - skipNullKeys = inner[0]; - } else { - // Left outer join - skipNullKeys = false; - buildBloomOp = inputs.get(0); // Bloom filter is built from left most input - for (int i = 1; i < inner.length; i++) { - applyBloomOps.add(inputs.get(i)); - } - } - - // Add BuildBloom operator to the input - POLocalRearrangeTez lr = (POLocalRearrangeTez) buildBloomOp.plan.getLeaves().get(0); - POBuildBloomRearrangeTez bbr = new POBuildBloomRearrangeTez(lr, createBloomInMap, numBloomFilters, vectorSizeBytes, numHash, hashType); - bbr.setSkipNullKeys(skipNullKeys); - buildBloomOp.plan.remove(lr); - buildBloomOp.plan.addAsLeaf(bbr); - - // Add a new reduce vertex that will construct the final bloom filter - // - by combining the bloom filters from the buildBloomOp input tasks in the map strategy - // - or directly from the keys from the buildBloomOp input tasks in the reduce strategy - TezOperator combineBloomOp = getTezOp(); - tezPlan.add(combineBloomOp); - combineBloomOp.markBuildBloom(); - // Explicitly set the parallelism for the new vertex to number of bloom filters. - // Auto parallelism will bring it down based on the actual output size - combineBloomOp.setEstimatedParallelism(numBloomFilters); - // We don't want parallelism to be changed during the run by grace auto parallelism - // It will take the whole input size and estimate way higher - combineBloomOp.setDontEstimateParallelism(true); - - String combineBloomOpKey = combineBloomOp.getOperatorKey().toString(); - TezEdgeDescriptor edge = new TezEdgeDescriptor(); - TezCompilerUtil.connect(tezPlan, buildBloomOp, combineBloomOp, edge); - bbr.setBloomOutputKey(combineBloomOpKey); - - - POPackage pkg = new POPackage(OperatorKey.genOpKey(scope)); - pkg.setNumInps(1); - BloomPackager pkgr = new BloomPackager(createBloomInMap, vectorSizeBytes, numHash, hashType);; - pkgr.setKeyType(DataType.INTEGER); - pkg.setPkgr(pkgr); - POValueOutputTez combineBloomOutput = new POValueOutputTez(OperatorKey.genOpKey(scope)); - combineBloomOp.plan.addAsLeaf(pkg); - combineBloomOp.plan.addAsLeaf(combineBloomOutput); - - edge.setIntermediateOutputKeyClass(NullableIntWritable.class.getName()); - edge.setIntermediateOutputKeyComparatorClass(PigWritableComparators.PigIntRawBytesComparator.class.getName()); - - // Add combiner as well. - POPackage pkg_c = new POPackage(OperatorKey.genOpKey(scope)); - BloomPackager combinerPkgr = new BloomPackager(createBloomInMap, vectorSizeBytes, numHash, hashType); - combinerPkgr.setCombiner(true); - combinerPkgr.setKeyType(DataType.INTEGER); - pkg_c.setPkgr(combinerPkgr); - pkg_c.setNumInps(1); - edge.combinePlan.addAsLeaf(pkg_c); - POProject prjKey = new POProject(OperatorKey.genOpKey(scope)); - prjKey.setResultType(DataType.INTEGER); - List<PhysicalPlan> clrInps = new ArrayList<PhysicalPlan>(); - PhysicalPlan pp = new PhysicalPlan(); - pp.add(prjKey); - clrInps.add(pp); - POLocalRearrangeTez clr = localRearrangeFactory.create(0, LocalRearrangeType.WITHPLAN, clrInps, DataType.INTEGER); - clr.setOutputKey(combineBloomOpKey); - edge.combinePlan.addAsLeaf(clr); - - if (createBloomInMap) { - // No combiner needed on map as there will be only one bloom filter per map for each partition - // In the reducer, the bloom filters will be combined with same logic of reduce in BloomPackager - edge.setCombinerInMap(false); - edge.setCombinerInReducer(true); - } else { - pkgr.setBloomKeyType(op.getPkgr().getKeyType()); - // Do distinct of the keys on the map side to reduce data sent to reducers. - // In case of reduce, not adding a combiner and doing the distinct during reduce itself. - // If needed one can be added later - edge.setCombinerInMap(true); - edge.setCombinerInReducer(false); - } - - // Broadcast the final bloom filter to other inputs - for (TezOperator applyBloomOp : applyBloomOps) { - applyBloomOp.markFilterBloom(); - lr = (POLocalRearrangeTez) applyBloomOp.plan.getLeaves().get(0); - POBloomFilterRearrangeTez bfr = new POBloomFilterRearrangeTez(lr, numBloomFilters); - applyBloomOp.plan.remove(lr); - applyBloomOp.plan.addAsLeaf(bfr); - bfr.setInputKey(combineBloomOpKey); - edge = new TezEdgeDescriptor(); - edge.setIntermediateOutputKeyClass(NullableIntWritable.class.getName()); - edge.setIntermediateOutputKeyComparatorClass(PigWritableComparators.PigIntRawBytesComparator.class.getName()); - TezCompilerUtil.configureValueOnlyTupleOutput(edge, DataMovementType.BROADCAST); - TezCompilerUtil.connect(tezPlan, combineBloomOp, applyBloomOp, edge); - combineBloomOutput.addOutputKey(applyBloomOp.getOperatorKey().toString()); - } - - } - @Override public void visitPOForEach(POForEach op) throws VisitorException{ try{ @@ -1653,7 +1513,7 @@ public class TezCompiler extends PhyPlan for (int i=0; i<transformPlans.size(); i++) { eps1.add(transformPlans.get(i)); - flat1.add(i == transformPlans.size() - 1 ? true : false); + flat1.add(true); } // This foreach will pick the sort key columns from the POPoissonSample output @@ -1862,7 +1722,7 @@ public class TezCompiler extends PhyPlan * @return * @throws IOException */ - public static FileSpec getTempFileSpec(PigContext pigContext) throws IOException { + private FileSpec getTempFileSpec() throws IOException { return new FileSpec(FileLocalizer.getTemporaryPath(pigContext).toString(), new FuncSpec(Utils.getTmpFileCompressorName(pigContext))); }
Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezEdgeDescriptor.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezEdgeDescriptor.java?rev=1784224&r1=1784223&r2=1784224&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezEdgeDescriptor.java (original) +++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezEdgeDescriptor.java Fri Feb 24 03:34:37 2017 @@ -31,13 +31,8 @@ import org.apache.tez.runtime.library.ou * Descriptor for Tez edge. It holds combine plan as well as edge properties. */ public class TezEdgeDescriptor implements Serializable { - - public transient PhysicalPlan combinePlan; - private boolean needsDistinctCombiner; - // Combiner runs on both input and output of Tez edge by default - // It can be configured to run only in output(map) or input(reduce) - private Boolean combinerInMap; - private Boolean combinerInReducer; + // Combiner runs on both input and output of Tez edge. + transient public PhysicalPlan combinePlan; public String inputClassName; public String outputClassName; @@ -70,30 +65,6 @@ public class TezEdgeDescriptor implement dataMovementType = DataMovementType.SCATTER_GATHER; } - public boolean needsDistinctCombiner() { - return needsDistinctCombiner; - } - - public void setNeedsDistinctCombiner(boolean nic) { - needsDistinctCombiner = nic; - } - - public Boolean getCombinerInMap() { - return combinerInMap; - } - - public void setCombinerInMap(Boolean combinerInMap) { - this.combinerInMap = combinerInMap; - } - - public Boolean getCombinerInReducer() { - return combinerInReducer; - } - - public void setCombinerInReducer(Boolean combinerInReducer) { - this.combinerInReducer = combinerInReducer; - } - public boolean isUseSecondaryKey() { return useSecondaryKey; } Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezOperPlan.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezOperPlan.java?rev=1784224&r1=1784223&r2=1784224&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezOperPlan.java (original) +++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezOperPlan.java Fri Feb 24 03:34:37 2017 @@ -25,9 +25,8 @@ import java.net.URI; import java.net.URISyntaxException; import java.net.URL; import java.util.ArrayList; -import java.util.Collections; import java.util.HashMap; -import java.util.LinkedHashSet; +import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; @@ -218,12 +217,8 @@ public class TezOperPlan extends Operato newPlan.add(node); } - // Using a LinkedHashSet and doing a sort so that - // test plan printed remains same between jdk7 and jdk8 - Set<Pair<TezOperator, TezOperator>> toReconnect = new LinkedHashSet<Pair<TezOperator, TezOperator>>(); - List<TezOperator> fromEdges = new ArrayList<>(mFromEdges.keySet()); - Collections.sort(fromEdges); - for (TezOperator from : fromEdges) { + Set<Pair<TezOperator, TezOperator>> toReconnect = new HashSet<Pair<TezOperator, TezOperator>>(); + for (TezOperator from : mFromEdges.keySet()) { List<TezOperator> tos = mFromEdges.get(from); for (TezOperator to : tos) { if (list.contains(from) || list.contains(to)) { Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezOperator.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezOperator.java?rev=1784224&r1=1784223&r2=1784224&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezOperator.java (original) +++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezOperator.java Fri Feb 24 03:34:37 2017 @@ -181,11 +181,7 @@ public class TezOperator extends Operato // Indicate if this job is a native job NATIVE, // Indicate if this job does rank counter - RANK_COUNTER, - // Indicate if this job constructs bloom filter - BUILDBLOOM, - // Indicate if this job applies bloom filter - FILTERBLOOM; + RANK_COUNTER; }; // Features in the job/vertex. Mostly will be only one feature. @@ -239,7 +235,6 @@ public class TezOperator extends Operato } private LoaderInfo loaderInfo = new LoaderInfo(); - private long totalInputFilesSize = -1; public TezOperator(OperatorKey k) { super(k); @@ -457,22 +452,6 @@ public class TezOperator extends Operato feature.set(OPER_FEATURE.RANK_COUNTER.ordinal()); } - public boolean isBuildBloom() { - return feature.get(OPER_FEATURE.BUILDBLOOM.ordinal()); - } - - public void markBuildBloom() { - feature.set(OPER_FEATURE.BUILDBLOOM.ordinal()); - } - - public boolean isFilterBloom() { - return feature.get(OPER_FEATURE.FILTERBLOOM.ordinal()); - } - - public void markFilterBloom() { - feature.set(OPER_FEATURE.FILTERBLOOM.ordinal()); - } - public void copyFeatures(TezOperator copyFrom, List<OPER_FEATURE> excludeFeatures) { for (OPER_FEATURE opf : OPER_FEATURE.values()) { if (excludeFeatures != null && excludeFeatures.contains(opf)) { @@ -672,14 +651,6 @@ public class TezOperator extends Operato return loaderInfo; } - public long getTotalInputFilesSize() { - return totalInputFilesSize; - } - - public void setTotalInputFilesSize(long totalInputFilesSize) { - this.totalInputFilesSize = totalInputFilesSize; - } - public void setUseGraceParallelism(boolean useGraceParallelism) { this.useGraceParallelism = useGraceParallelism; } Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezPOPackageAnnotator.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezPOPackageAnnotator.java?rev=1784224&r1=1784223&r2=1784224&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezPOPackageAnnotator.java (original) +++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezPOPackageAnnotator.java Fri Feb 24 03:34:37 2017 @@ -31,7 +31,6 @@ import org.apache.pig.backend.hadoop.exe import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLocalRearrange; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPackage; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSplit; -import org.apache.pig.backend.hadoop.executionengine.tez.plan.operator.BloomPackager; import org.apache.pig.backend.hadoop.executionengine.tez.plan.operator.POLocalRearrangeTez; import org.apache.pig.impl.plan.DepthFirstWalker; import org.apache.pig.impl.plan.VisitorException; @@ -162,7 +161,7 @@ public class TezPOPackageAnnotator exten @Override public void visitLocalRearrange(POLocalRearrange lrearrange) throws VisitorException { POLocalRearrangeTez lr = (POLocalRearrangeTez) lrearrange; - if (!(lr.isConnectedToPackage() && lr.containsOutputKey(pkgTezOp.getOperatorKey().toString()))) { + if (!(lr.isConnectedToPackage() && lr.getOutputKey().equals(pkgTezOp.getOperatorKey().toString()))) { return; } loRearrangeFound++; @@ -181,9 +180,7 @@ public class TezPOPackageAnnotator exten if(keyInfo == null) keyInfo = new HashMap<Integer, Pair<Boolean, Map<Integer, Integer>>>(); - // For BloomPackager there is only one input, but the - // POBuildBloomRearrangeTez index is that of the join's index and can be non-zero - Integer index = (pkg.getPkgr() instanceof BloomPackager) ? 0 : Integer.valueOf(lrearrange.getIndex()); + Integer index = Integer.valueOf(lrearrange.getIndex()); if(keyInfo.get(index) != null) { if (isPOSplit) { // Case of POSplit having more than one input in case of self join or union @@ -200,20 +197,12 @@ public class TezPOPackageAnnotator exten } - if (pkg.getPkgr() instanceof BloomPackager ) { - keyInfo.put(index, - new Pair<Boolean, Map<Integer, Integer>>( - Boolean.FALSE, new HashMap<Integer, Integer>())); - pkg.getPkgr().setKeyInfo(keyInfo); - } else { - keyInfo.put(index, - new Pair<Boolean, Map<Integer, Integer>>( - lrearrange.isProjectStar(), lrearrange.getProjectedColsMap())); - pkg.getPkgr().setKeyInfo(keyInfo); - pkg.getPkgr().setKeyTuple(lrearrange.isKeyTuple()); - pkg.getPkgr().setKeyCompound(lrearrange.isKeyCompound()); - } - + keyInfo.put(index, + new Pair<Boolean, Map<Integer, Integer>>( + lrearrange.isProjectStar(), lrearrange.getProjectedColsMap())); + pkg.getPkgr().setKeyInfo(keyInfo); + pkg.getPkgr().setKeyTuple(lrearrange.isKeyTuple()); + pkg.getPkgr().setKeyCompound(lrearrange.isKeyCompound()); } /** Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezPlanContainer.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezPlanContainer.java?rev=1784224&r1=1784223&r2=1784224&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezPlanContainer.java (original) +++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezPlanContainer.java Fri Feb 24 03:34:37 2017 @@ -29,14 +29,9 @@ import java.util.Map; import java.util.Set; import org.apache.hadoop.yarn.api.records.LocalResource; -import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLoad; -import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore; import org.apache.pig.backend.hadoop.executionengine.tez.TezResourceManager; -import org.apache.pig.backend.hadoop.executionengine.tez.plan.operator.POStoreTez; import org.apache.pig.impl.PigContext; -import org.apache.pig.impl.io.FileSpec; import org.apache.pig.impl.plan.DependencyOrderWalker; -import org.apache.pig.impl.plan.NodeIdGenerator; import org.apache.pig.impl.plan.OperatorKey; import org.apache.pig.impl.plan.OperatorPlan; import org.apache.pig.impl.plan.PlanException; @@ -165,178 +160,100 @@ public class TezPlanContainer extends Op return; } - List<TezOperator> opersToSegment = null; + TezOperator operToSegment = null; + List<TezOperator> succs = new ArrayList<TezOperator>(); try { // Split top down from root to leaves - // Get list of operators closer to the root that can be segmented together - FirstLevelSegmentOperatorsFinder finder = new FirstLevelSegmentOperatorsFinder(tezOperPlan); + SegmentOperatorFinder finder = new SegmentOperatorFinder(tezOperPlan); finder.visit(); - opersToSegment = finder.getOperatorsToSegment(); + operToSegment = finder.getOperatorToSegment(); } catch (VisitorException e) { throw new PlanException(e); } - if (!opersToSegment.isEmpty()) { - Set<TezOperator> commonSplitterPredecessors = new HashSet<>(); - for (TezOperator operToSegment : opersToSegment) { - for (TezOperator succ : tezOperPlan.getSuccessors(operToSegment)) { - commonSplitterPredecessors - .addAll(getCommonSplitterPredecessors(tezOperPlan, - operToSegment, succ)); - } - } - if (commonSplitterPredecessors.isEmpty()) { - List<TezOperator> allSuccs = new ArrayList<TezOperator>(); - // Disconnect all the successors and move them to a new plan - for (TezOperator operToSegment : opersToSegment) { - List<TezOperator> succs = new ArrayList<TezOperator>(); - succs.addAll(tezOperPlan.getSuccessors(operToSegment)); - allSuccs.addAll(succs); - for (TezOperator succ : succs) { - tezOperPlan.disconnect(operToSegment, succ); + if (operToSegment != null && tezOperPlan.getSuccessors(operToSegment) != null) { + succs.addAll(tezOperPlan.getSuccessors(operToSegment)); + for (TezOperator succ : succs) { + tezOperPlan.disconnect(operToSegment, succ); + } + for (TezOperator succ : succs) { + try { + if (tezOperPlan.getOperator(succ.getOperatorKey()) == null) { + // Has already been moved to a new plan by previous successor + // as part of dependency. It could have been further split. + // So walk the full plan to find the new plan and connect + TezOperatorFinder finder = new TezOperatorFinder(this, succ); + finder.visit(); + connect(planNode, finder.getPlanContainerNode()); + continue; } - } - TezOperPlan newOperPlan = new TezOperPlan(); - for (TezOperator succ : allSuccs) { + TezOperPlan newOperPlan = new TezOperPlan(); tezOperPlan.moveTree(succ, newOperPlan); - } - TezPlanContainerNode newPlanNode = new TezPlanContainerNode( - generateNodeOperatorKey(), newOperPlan); - add(newPlanNode); - connect(planNode, newPlanNode); - split(newPlanNode); - } else { - // If there is a common splitter predecessor between operToSegment and the successor, - // we have to separate out that split to be able to segment. - // So we store the output of split to a temp store and then change the - // splittees to load from it. - String scope = opersToSegment.get(0).getOperatorKey().getScope(); - for (TezOperator splitter : commonSplitterPredecessors) { - try { - List<TezOperator> succs = new ArrayList<TezOperator>(); - succs.addAll(tezOperPlan.getSuccessors(splitter)); - FileSpec fileSpec = TezCompiler.getTempFileSpec(pigContext); - POStore tmpStore = getTmpStore(scope, fileSpec); - // Replace POValueOutputTez with POStore - splitter.plan.remove(splitter.plan.getLeaves().get(0)); - splitter.plan.addAsLeaf(tmpStore); - splitter.segmentBelow = true; - splitter.setSplitter(false); - for (TezOperator succ : succs) { - // Replace POValueInputTez with POLoad - POLoad tmpLoad = getTmpLoad(scope, fileSpec); - succ.plan.replace(succ.plan.getRoots().get(0), tmpLoad); - } - } catch (Exception e) { - throw new PlanException(e); + TezPlanContainerNode newPlanNode = new TezPlanContainerNode( + generateNodeOperatorKey(), newOperPlan); + add(newPlanNode); + connect(planNode, newPlanNode); + split(newPlanNode); + if (newPlanNode.getTezOperPlan().getOperator(succ.getOperatorKey()) == null) { + // On further split, the successor moved to a new plan container. + // Connect to that + TezOperatorFinder finder = new TezOperatorFinder(this, succ); + finder.visit(); + disconnect(planNode, newPlanNode); + connect(planNode, finder.getPlanContainerNode()); } + } catch (VisitorException e) { + throw new PlanException(e); } } split(planNode); } } - private static class FirstLevelSegmentOperatorsFinder extends TezOpPlanVisitor { + private static class SegmentOperatorFinder extends TezOpPlanVisitor { - private List<TezOperator> opersToSegment = new ArrayList<>(); + private TezOperator operToSegment; - public FirstLevelSegmentOperatorsFinder(TezOperPlan plan) { + public SegmentOperatorFinder(TezOperPlan plan) { super(plan, new DependencyOrderWalker<TezOperator, TezOperPlan>(plan)); } - public List<TezOperator> getOperatorsToSegment() { - return opersToSegment; + public TezOperator getOperatorToSegment() { + return operToSegment; } @Override - public void visitTezOp(TezOperator tezOp) throws VisitorException { - if (tezOp.needSegmentBelow() && getPlan().getSuccessors(tezOp) != null) { - if (opersToSegment.isEmpty()) { - opersToSegment.add(tezOp); - } else { - // If the operator does not have dependency on previous - // operators chosen for segmenting then add it to the - // operators to be segmented together - if (!hasPredecessor(tezOp, opersToSegment)) { - opersToSegment.add(tezOp); - } - } + public void visitTezOp(TezOperator tezOperator) throws VisitorException { + if (tezOperator.needSegmentBelow() && operToSegment == null) { + operToSegment = tezOperator; } } - /** - * Check if the tezOp has one of the opsToCheck as a predecessor. - * It can be a immediate predecessor or multiple levels up. - */ - private boolean hasPredecessor(TezOperator tezOp, List<TezOperator> opsToCheck) { - List<TezOperator> predecessors = getPlan().getPredecessors(tezOp); - if (predecessors != null) { - for (TezOperator pred : predecessors) { - if (opersToSegment.contains(pred)) { - return true; - } else { - if (hasPredecessor(pred, opsToCheck)) { - return true; - } - } - } - } - return false; - } - } - private Set<TezOperator> getCommonSplitterPredecessors(TezOperPlan plan, TezOperator operToSegment, TezOperator successor) { - Set<TezOperator> splitters1 = new HashSet<>(); - Set<TezOperator> splitters2 = new HashSet<>(); - Set<TezOperator> processedPredecessors = new HashSet<>(); - // Find predecessors which are splitters - fetchSplitterPredecessors(plan, operToSegment, processedPredecessors, splitters1); - if (!splitters1.isEmpty()) { - // For the successor, traverse rest of the plan below it and - // search the predecessors of its successors to find any predecessor that might be a splitter. - Set<TezOperator> allSuccs = new HashSet<>(); - getAllSuccessors(plan, successor, allSuccs); - processedPredecessors.clear(); - processedPredecessors.add(successor); - for (TezOperator succ : allSuccs) { - fetchSplitterPredecessors(plan, succ, processedPredecessors, splitters2); - } - // Find the common ones - splitters1.retainAll(splitters2); + private static class TezOperatorFinder extends TezPlanContainerVisitor { + + private TezPlanContainerNode planContainerNode; + private TezOperator operatorToFind; + + public TezOperatorFinder(TezPlanContainer plan, TezOperator operatorToFind) { + super(plan, new DependencyOrderWalker<TezPlanContainerNode, TezPlanContainer>(plan)); + this.operatorToFind = operatorToFind; } - return splitters1; - } - private void fetchSplitterPredecessors(TezOperPlan plan, TezOperator tezOp, - Set<TezOperator> processedPredecessors, Set<TezOperator> splitters) { - List<TezOperator> predecessors = plan.getPredecessors(tezOp); - if (predecessors != null) { - for (TezOperator pred : predecessors) { - // Skip processing already processed predecessor to avoid loops - if (processedPredecessors.contains(pred)) { - continue; - } - if (pred.isSplitter()) { - splitters.add(pred); - } else if (!pred.needSegmentBelow()) { - processedPredecessors.add(pred); - fetchSplitterPredecessors(plan, pred, processedPredecessors, splitters); - } - } + public TezPlanContainerNode getPlanContainerNode() { + return planContainerNode; } - } - private void getAllSuccessors(TezOperPlan plan, TezOperator tezOp, Set<TezOperator> allSuccs) { - List<TezOperator> successors = plan.getSuccessors(tezOp); - if (successors != null) { - for (TezOperator succ : successors) { - if (!allSuccs.contains(succ)) { - allSuccs.add(succ); - getAllSuccessors(plan, succ, allSuccs); - } + @Override + public void visitTezPlanContainerNode( + TezPlanContainerNode tezPlanContainerNode) + throws VisitorException { + if (tezPlanContainerNode.getTezOperPlan().getOperatorKey(operatorToFind) != null) { + planContainerNode = tezPlanContainerNode; } } + } private synchronized OperatorKey generateNodeOperatorKey() { @@ -350,21 +267,6 @@ public class TezPlanContainer extends Op scopeId = 0; } - private POLoad getTmpLoad(String scope, FileSpec fileSpec){ - POLoad ld = new POLoad(new OperatorKey(scope, NodeIdGenerator.getGenerator().getNextNodeId(scope))); - ld.setPc(pigContext); - ld.setIsTmpLoad(true); - ld.setLFile(fileSpec); - return ld; - } - - private POStore getTmpStore(String scope, FileSpec fileSpec){ - POStore st = new POStore(new OperatorKey(scope, NodeIdGenerator.getGenerator().getNextNodeId(scope))); - st.setIsTmpStore(true); - st.setSFile(fileSpec); - return new POStoreTez(st); - } - @Override public String toString() { ByteArrayOutputStream baos = new ByteArrayOutputStream(); Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezPrinter.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezPrinter.java?rev=1784224&r1=1784223&r2=1784224&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezPrinter.java (original) +++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezPrinter.java Fri Feb 24 03:34:37 2017 @@ -22,7 +22,6 @@ import java.util.ArrayList; import java.util.Collections; import java.util.List; -import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.DistinctCombiner; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PlanPrinter; @@ -81,9 +80,6 @@ public class TezPrinter extends TezOpPla printer.setVerbose(isVerbose); printer.visit(); mStream.println(); - } else if (edgeDesc.needsDistinctCombiner()) { - mStream.println("# Combine plan on edge <" + inEdge + ">"); - mStream.println(DistinctCombiner.Combine.class.getName()); } } } Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POCounterStatsTez.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POCounterStatsTez.java?rev=1784224&r1=1784223&r2=1784224&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POCounterStatsTez.java (original) +++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POCounterStatsTez.java Fri Feb 24 03:34:37 2017 @@ -56,7 +56,6 @@ public class POCounterStatsTez extends P private transient KeyValuesReader reader; private transient KeyValueWriter writer; private transient boolean finished = false; - private transient boolean hasNext = false; public POCounterStatsTez(OperatorKey k) { super(k); @@ -89,7 +88,6 @@ public class POCounterStatsTez extends P try { reader = (KeyValuesReader) input.getReader(); LOG.info("Attached input from vertex " + inputKey + " : input=" + input + ", reader=" + reader); - hasNext = reader.next(); } catch (Exception e) { throw new ExecException(e); } @@ -132,13 +130,12 @@ public class POCounterStatsTez extends P Integer key = null; Long value = null; // Read count of records per task - while (hasNext) { + while (reader.next()) { key = ((IntWritable)reader.getCurrentKey()).get(); for (Object val : reader.getCurrentValues()) { value = ((LongWritable)val).get(); counterRecords.put(key, value); } - hasNext = reader.next(); } // BinInterSedes only takes String for map key Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POFRJoinTez.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POFRJoinTez.java?rev=1784224&r1=1784223&r2=1784224&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POFRJoinTez.java (original) +++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POFRJoinTez.java Fri Feb 24 03:34:37 2017 @@ -19,8 +19,6 @@ package org.apache.pig.backend.hadoop.executionengine.tez.plan.operator; import java.io.IOException; -import java.util.ArrayList; -import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Set; @@ -103,13 +101,9 @@ public class POFRJoinTez extends POFRJoi LogicalInput input = inputs.get(key); if (!this.replInputs.contains(input)) { this.replInputs.add(input); - KeyValueReader reader = (KeyValueReader) input.getReader(); - this.replReaders.add(reader); - log.info("Attached input from vertex " + key + " : input=" + input + ", reader=" + reader); + this.replReaders.add((KeyValueReader) input.getReader()); } } - // Do not force fetch input by reading first record. Cases like MultiQuery_Union_4 have - // multiple POFRJoinTez loading same replicate input and will skip records } catch (Exception e) { throw new ExecException(e); } @@ -120,7 +114,6 @@ public class POFRJoinTez extends POFRJoi * * @throws ExecException */ - @SuppressWarnings("unchecked") @Override protected void setUpHashMap() throws ExecException { @@ -128,8 +121,8 @@ public class POFRJoinTez extends POFRJoi // where same POFRJoinTez occurs in different Split sub-plans Object cacheValue = ObjectCache.getInstance().retrieve(cacheKey); if (cacheValue != null) { - replicates = (List<Map<? extends Object, ? extends List<Tuple>>>) cacheValue; - log.info("Found " + (replicates.size() - 1) + " replication hash tables in Tez cache. cachekey=" + cacheKey); + replicates = (TupleToMapKey[]) cacheValue; + log.info("Found " + (replicates.length - 1) + " replication hash tables in Tez cache. cachekey=" + cacheKey); return; } @@ -155,7 +148,7 @@ public class POFRJoinTez extends POFRJoi long time1 = System.currentTimeMillis(); - replicates.set(fragment, null); + replicates[fragment] = null; int inputIdx = 0; // We need to adjust the index because the number of replInputs is // one less than the number of inputSchemas. The inputSchemas @@ -165,12 +158,7 @@ public class POFRJoinTez extends POFRJoi SchemaTupleFactory inputSchemaTupleFactory = inputSchemaTupleFactories[schemaIdx]; SchemaTupleFactory keySchemaTupleFactory = keySchemaTupleFactories[schemaIdx]; - Map<Object, ArrayList<Tuple>> replicate; - if (keySchemaTupleFactory == null) { - replicate = new HashMap<Object, ArrayList<Tuple>>(4000); - } else { - replicate = new TupleToMapKey(4000, keySchemaTupleFactory); - } + TupleToMapKey replicate = new TupleToMapKey(4000, keySchemaTupleFactory); POLocalRearrange lr = LRs[schemaIdx]; try { @@ -180,8 +168,7 @@ public class POFRJoinTez extends POFRJoi } PigNullableWritable key = (PigNullableWritable) replReaders.get(inputIdx).getCurrentKey(); - Object keyValue = key.getValueAsPigType(); - if (isKeyNull(keyValue)) continue; + if (isKeyNull(key.getValueAsPigType())) continue; NullableTuple val = (NullableTuple) replReaders.get(inputIdx).getCurrentValue(); // POFRJoin#getValueTuple() is reused to construct valTuple, @@ -189,31 +176,27 @@ public class POFRJoinTez extends POFRJoi // construct one here. Tuple retTuple = mTupleFactory.newTuple(3); retTuple.set(0, key.getIndex()); - retTuple.set(1, keyValue); + retTuple.set(1, key.getValueAsPigType()); retTuple.set(2, val.getValueAsPigType()); Tuple valTuple = getValueTuple(lr, retTuple); - ArrayList<Tuple> values = replicate.get(keyValue); - if (values == null) { - if (inputSchemaTupleFactory == null) { - values = new ArrayList<Tuple>(1); - } else { - values = new TuplesToSchemaTupleList(1, inputSchemaTupleFactory); - } - replicate.put(keyValue, values); + Tuple keyTuple = mTupleFactory.newTuple(1); + keyTuple.set(0, key.getValueAsPigType()); + if (replicate.get(keyTuple) == null) { + replicate.put(keyTuple, new TuplesToSchemaTupleList(1, inputSchemaTupleFactory)); } - values.add(valTuple); + replicate.get(keyTuple).add(valTuple); } } catch (IOException e) { throw new ExecException(e); } - replicates.set(schemaIdx, replicate); + replicates[schemaIdx] = replicate; inputIdx++; schemaIdx++; } long time2 = System.currentTimeMillis(); - log.info((replicates.size() - 1) + " replication hash tables built. Time taken: " + (time2 - time1)); + log.info((replicates.length - 1) + " replication hash tables built. Time taken: " + (time2 - time1)); ObjectCache.getInstance().cache(cacheKey, replicates); log.info("Cached replicate hash tables in Tez ObjectRegistry with vertex scope. cachekey=" + cacheKey); Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POIdentityInOutTez.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POIdentityInOutTez.java?rev=1784224&r1=1784223&r2=1784224&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POIdentityInOutTez.java (original) +++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POIdentityInOutTez.java Fri Feb 24 03:34:37 2017 @@ -57,7 +57,6 @@ public class POIdentityInOutTez extends private transient KeyValuesReader shuffleReader; private transient boolean shuffleInput; private transient boolean finished = false; - private transient boolean hasNext = false; public POIdentityInOutTez(OperatorKey k, POLocalRearrange inputRearrange, String inputKey) { super(inputRearrange); @@ -96,12 +95,9 @@ public class POIdentityInOutTez extends Reader r = input.getReader(); if (r instanceof KeyValueReader) { reader = (KeyValueReader) r; - // Force input fetch - hasNext = reader.next(); } else { shuffleInput = true; shuffleReader = (KeyValuesReader) r; - hasNext = shuffleReader.next(); } LOG.info("Attached input from vertex " + inputKey + " : input=" + input + ", reader=" + r); } catch (Exception e) { @@ -131,7 +127,7 @@ public class POIdentityInOutTez extends return RESULT_EOP; } if (shuffleInput) { - while (hasNext) { + while (shuffleReader.next()) { Object curKey = shuffleReader.getCurrentKey(); Iterable<Object> vals = shuffleReader.getCurrentValues(); if (isSkewedJoin) { @@ -143,10 +139,9 @@ public class POIdentityInOutTez extends for (Object val : vals) { writer.write(curKey, val); } - hasNext = shuffleReader.next(); } } else { - while (hasNext) { + while (reader.next()) { if (isSkewedJoin) { NullablePartitionWritable wrappedKey = new NullablePartitionWritable( (PigNullableWritable) reader.getCurrentKey()); @@ -160,7 +155,6 @@ public class POIdentityInOutTez extends writer.write(reader.getCurrentKey(), reader.getCurrentValue()); } - hasNext = reader.next(); } } finished = true; Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POLocalRearrangeTez.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POLocalRearrangeTez.java?rev=1784224&r1=1784223&r2=1784224&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POLocalRearrangeTez.java (original) +++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POLocalRearrangeTez.java Fri Feb 24 03:34:37 2017 @@ -71,8 +71,8 @@ public class POLocalRearrangeTez extends } } - public boolean containsOutputKey(String key) { - return outputKey.equals(key); + public String getOutputKey() { + return outputKey; } public void setOutputKey(String outputKey) { @@ -122,10 +122,6 @@ public class POLocalRearrangeTez extends } } - protected Result getRearrangedTuple() throws ExecException { - return super.getNextTuple(); - } - @Override public Result getNextTuple() throws ExecException { res = super.getNextTuple(); Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/PORankTez.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/PORankTez.java?rev=1784224&r1=1784223&r2=1784224&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/PORankTez.java (original) +++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/PORankTez.java Fri Feb 24 03:34:37 2017 @@ -51,7 +51,6 @@ public class PORankTez extends PORank im private transient Map<Integer, Long> counterOffsets; private transient Configuration conf; private transient boolean finished = false; - private transient Boolean hasFirstRecord; public PORankTez(PORank copy) { super(copy); @@ -101,7 +100,6 @@ public class PORankTez extends PORank im try { reader = (KeyValueReader) input.getReader(); LOG.info("Attached input from vertex " + tuplesInputKey + " : input=" + input + ", reader=" + reader); - hasFirstRecord = reader.next(); } catch (Exception e) { throw new ExecException(e); } @@ -142,18 +140,9 @@ public class PORankTez extends PORank im Result inp = null; try { - if (hasFirstRecord != null) { - if (hasFirstRecord) { - hasFirstRecord = null; - inp = new Result(POStatus.STATUS_OK, reader.getCurrentValue()); - return addRank(inp); - } - hasFirstRecord = null; - } else { - while (reader.next()) { - inp = new Result(POStatus.STATUS_OK, reader.getCurrentValue()); - return addRank(inp); - } + while (reader.next()) { + inp = new Result(POStatus.STATUS_OK, reader.getCurrentValue()); + return addRank(inp); } } catch (IOException e) { throw new ExecException(e); Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POShuffleTezLoad.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POShuffleTezLoad.java?rev=1784224&r1=1784223&r2=1784224&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POShuffleTezLoad.java (original) +++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POShuffleTezLoad.java Fri Feb 24 03:34:37 2017 @@ -25,8 +25,6 @@ import java.util.List; import java.util.Map; import java.util.Set; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.WritableComparator; import org.apache.pig.backend.executionengine.ExecException; @@ -34,16 +32,12 @@ import org.apache.pig.backend.hadoop.exe import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.AccumulativeTupleBuffer; -import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.CombinerPackager; -import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.LitePackager; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPackage; -import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.Packager; import org.apache.pig.backend.hadoop.executionengine.tez.runtime.TezInput; import org.apache.pig.backend.hadoop.executionengine.util.AccumulatorOptimizerUtil; import org.apache.pig.data.AccumulativeBag; import org.apache.pig.data.DataBag; import org.apache.pig.data.InternalCachedBag; -import org.apache.pig.data.ReadOnceBag; import org.apache.pig.data.Tuple; import org.apache.pig.impl.io.NullableTuple; import org.apache.pig.impl.io.PigNullableWritable; @@ -54,7 +48,6 @@ import org.apache.tez.runtime.library.co public class POShuffleTezLoad extends POPackage implements TezInput { private static final long serialVersionUID = 1L; - private static final Log LOG = LogFactory.getLog(POShuffleTezLoad.class); protected List<String> inputKeys = new ArrayList<String>(); private boolean isSkewedJoin = false; @@ -68,7 +61,6 @@ public class POShuffleTezLoad extends PO private transient WritableComparator groupingComparator = null; private transient Configuration conf; private transient int accumulativeBatchSize; - private transient boolean readOnceOneBag; public POShuffleTezLoad(POPackage pack) { super(pack); @@ -109,10 +101,7 @@ public class POShuffleTezLoad extends PO // - Input key will be repeated, but index would be same within a TezInput if (!this.inputs.contains(input)) { this.inputs.add(input); - KeyValuesReader reader = (KeyValuesReader)input.getReader(); - this.readers.add(reader); - LOG.info("Attached input from vertex " + inputKey - + " : input=" + input + ", reader=" + reader); + this.readers.add((KeyValuesReader)input.getReader()); } } @@ -128,13 +117,6 @@ public class POShuffleTezLoad extends PO for (int i = 0; i < numTezInputs; i++) { finished[i] = !readers.get(i).next(); } - - this.readOnceOneBag = (numInputs == 1) - && (pkgr instanceof CombinerPackager - || pkgr instanceof LitePackager || pkgr instanceof BloomPackager); - if (readOnceOneBag) { - readOnce[0] = true; - } } catch (Exception e) { throw new ExecException(e); } @@ -205,47 +187,43 @@ public class POShuffleTezLoad extends PO } else { - if (readOnceOneBag) { - bags[0] = new TezReadOnceBag(pkgr, min); - } else { - for (int i = 0; i < numInputs; i++) { - bags[i] = new InternalCachedBag(numInputs); - } + for (int i = 0; i < numInputs; i++) { + bags[i] = new InternalCachedBag(numInputs); + } - if (numTezInputs == 1) { - do { - Iterable<Object> vals = readers.get(0).getCurrentValues(); - for (Object val : vals) { - NullableTuple nTup = (NullableTuple) val; - int index = nTup.getIndex(); - Tuple tup = pkgr.getValueTuple(keyWritable, nTup, index); - bags[index].add(tup); - } - finished[0] = !readers.get(0).next(); - if (finished[0]) { - break; - } - cur = readers.get(0).getCurrentKey(); - } while (groupingComparator.compare(min, cur) == 0); // We need to loop in case of Grouping Comparators - } else { - for (int i = 0; i < numTezInputs; i++) { - if (!finished[i]) { - cur = readers.get(i).getCurrentKey(); - // We need to loop in case of Grouping Comparators - while (groupingComparator.compare(min, cur) == 0) { - Iterable<Object> vals = readers.get(i).getCurrentValues(); - for (Object val : vals) { - NullableTuple nTup = (NullableTuple) val; - int index = nTup.getIndex(); - Tuple tup = pkgr.getValueTuple(keyWritable, nTup, index); - bags[index].add(tup); - } - finished[i] = !readers.get(i).next(); - if (finished[i]) { - break; - } - cur = readers.get(i).getCurrentKey(); + if (numTezInputs == 1) { + do { + Iterable<Object> vals = readers.get(0).getCurrentValues(); + for (Object val : vals) { + NullableTuple nTup = (NullableTuple) val; + int index = nTup.getIndex(); + Tuple tup = pkgr.getValueTuple(keyWritable, nTup, index); + bags[index].add(tup); + } + finished[0] = !readers.get(0).next(); + if (finished[0]) { + break; + } + cur = readers.get(0).getCurrentKey(); + } while (groupingComparator.compare(min, cur) == 0); // We need to loop in case of Grouping Comparators + } else { + for (int i = 0; i < numTezInputs; i++) { + if (!finished[i]) { + cur = readers.get(i).getCurrentKey(); + // We need to loop in case of Grouping Comparators + while (groupingComparator.compare(min, cur) == 0) { + Iterable<Object> vals = readers.get(i).getCurrentValues(); + for (Object val : vals) { + NullableTuple nTup = (NullableTuple) val; + int index = nTup.getIndex(); + Tuple tup = pkgr.getValueTuple(keyWritable, nTup, index); + bags[index].add(tup); + } + finished[i] = !readers.get(i).next(); + if (finished[i]) { + break; } + cur = readers.get(i).getCurrentKey(); } } } @@ -405,74 +383,4 @@ public class POShuffleTezLoad extends PO } - private class TezReadOnceBag extends ReadOnceBag { - - private static final long serialVersionUID = 1L; - private Iterator<Object> iter; - - public TezReadOnceBag(Packager pkgr, - PigNullableWritable currentKey) throws IOException { - this.pkgr = pkgr; - this.keyWritable = currentKey; - this.iter = readers.get(0).getCurrentValues().iterator(); - } - - @Override - public Iterator<Tuple> iterator() { - return new TezReadOnceBagIterator(); - } - - private class TezReadOnceBagIterator implements Iterator<Tuple> { - - @Override - public boolean hasNext() { - if (iter.hasNext()) { - return true; - } else { - try { - finished[0] = !readers.get(0).next(); - if (finished[0]) { - return false; - } - // Currently combiner is not being applied when secondary key(grouping comparator) is used - // But might change in future. So check if the next key is same and return its values - Object cur = readers.get(0).getCurrentKey(); - if (groupingComparator.compare(keyWritable, cur) == 0) { - iter = readers.get(0).getCurrentValues().iterator(); - // Key should at least have one value. But doing a check just for safety - if (iter.hasNext()) { - return true; - } else { - throw new RuntimeException("Unexpected. Key " + keyWritable + " does not have any values"); - } - } - return false; - } catch (IOException e) { - throw new RuntimeException("ReadOnceBag failed to get value tuple : ", e); - } - } - } - - @Override - public Tuple next() { - NullableTuple ntup = (NullableTuple) iter.next(); - int index = ntup.getIndex(); - Tuple ret = null; - try { - ret = pkgr.getValueTuple(keyWritable, ntup, index); - } catch (ExecException e) { - throw new RuntimeException("ReadOnceBag failed to get value tuple : ", e); - } - return ret; - } - - @Override - public void remove() { - throw new RuntimeException("ReadOnceBag.iterator().remove() is not allowed"); - } - } - - } - - } Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POShuffledValueInputTez.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POShuffledValueInputTez.java?rev=1784224&r1=1784223&r2=1784224&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POShuffledValueInputTez.java (original) +++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POShuffledValueInputTez.java Fri Feb 24 03:34:37 2017 @@ -57,7 +57,6 @@ public class POShuffledValueInputTez ext private transient Iterator<KeyValueReader> readers; private transient KeyValueReader currentReader; private transient Configuration conf; - private transient Boolean hasFirstRecord; public POShuffledValueInputTez(OperatorKey k) { super(k); @@ -99,8 +98,6 @@ public class POShuffledValueInputTez ext } readers = readersList.iterator(); currentReader = readers.next(); - // Force input fetch - hasFirstRecord = currentReader.next(); } catch (Exception e) { throw new ExecException(e); } @@ -114,15 +111,7 @@ public class POShuffledValueInputTez ext } do { - if (hasFirstRecord != null) { - if (hasFirstRecord) { - hasFirstRecord = null; - Tuple origTuple = (Tuple) currentReader.getCurrentValue(); - Tuple copy = mTupleFactory.newTuple(origTuple.getAll()); - return new Result(POStatus.STATUS_OK, copy); - } - hasFirstRecord = null; - } else if (currentReader.next()) { + if (currentReader.next()) { Tuple origTuple = (Tuple) currentReader.getCurrentValue(); Tuple copy = mTupleFactory.newTuple(origTuple.getAll()); return new Result(POStatus.STATUS_OK, copy); Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POSimpleTezLoad.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POSimpleTezLoad.java?rev=1784224&r1=1784223&r2=1784224&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POSimpleTezLoad.java (original) +++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POSimpleTezLoad.java Fri Feb 24 03:34:37 2017 @@ -60,8 +60,6 @@ public class POSimpleTezLoad extends POL private transient Configuration conf; private transient boolean finished = false; private transient TezCounter inputRecordCounter; - private transient boolean initialized; - private transient boolean noTupleCopy; public POSimpleTezLoad(OperatorKey k, LoadFunc loader) { super(k, loader); @@ -151,13 +149,7 @@ public class POSimpleTezLoad extends POL } else { Result res = new Result(); Tuple next = (Tuple) reader.getCurrentValue(); - if (!initialized) { - noTupleCopy = mTupleFactory.newTuple(1).getClass().isInstance(next); - initialized = true; - } - // Some Loaders return implementations of DefaultTuple instead of BinSedesTuple - // In that case copy to BinSedesTuple - res.result = noTupleCopy ? next : mTupleFactory.newTupleNoCopy(next.getAll()); + res.result = next; res.returnStatus = POStatus.STATUS_OK; if (inputRecordCounter != null) { inputRecordCounter.increment(1); Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POStoreTez.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POStoreTez.java?rev=1784224&r1=1784223&r2=1784224&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POStoreTez.java (original) +++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POStoreTez.java Fri Feb 24 03:34:37 2017 @@ -102,19 +102,19 @@ public class POStoreTez extends POStore throw new ExecException(e); } - // Even if there is a single hdfs output, we add multi store counter - // Makes it easier for user to see records for a particular store from - // the DAG counter - CounterGroup multiStoreGroup = processorContext.getCounters() - .getGroup(MRPigStatsUtil.MULTI_STORE_COUNTER_GROUP); - if (multiStoreGroup == null) { - processorContext.getCounters().addGroup( - MRPigStatsUtil.MULTI_STORE_COUNTER_GROUP, - MRPigStatsUtil.MULTI_STORE_COUNTER_GROUP); - } - String name = MRPigStatsUtil.getMultiStoreCounterName(this); - if (name != null) { - outputRecordCounter = multiStoreGroup.addCounter(name, name, 0); + // Multiple outputs - can be another store or other outputs (shuffle, broadcast) + if (outputs.size() > 1) { + CounterGroup multiStoreGroup = processorContext.getCounters() + .getGroup(MRPigStatsUtil.MULTI_STORE_COUNTER_GROUP); + if (multiStoreGroup == null) { + processorContext.getCounters().addGroup( + MRPigStatsUtil.MULTI_STORE_COUNTER_GROUP, + MRPigStatsUtil.MULTI_STORE_COUNTER_GROUP); + } + String name = MRPigStatsUtil.getMultiStoreCounterName(this); + if (name != null) { + outputRecordCounter = multiStoreGroup.addCounter(name, name, 0); + } } } Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POValueInputTez.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POValueInputTez.java?rev=1784224&r1=1784223&r2=1784224&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POValueInputTez.java (original) +++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POValueInputTez.java Fri Feb 24 03:34:37 2017 @@ -57,7 +57,6 @@ public class POValueInputTez extends Phy private transient KeyValuesReader shuffleReader; private transient boolean shuffleInput; private transient boolean hasNext; - private transient Boolean hasFirstRecord; public POValueInputTez(OperatorKey k) { super(k); @@ -93,8 +92,6 @@ public class POValueInputTez extends Phy Reader r = input.getReader(); if (r instanceof KeyValueReader) { reader = (KeyValueReader) r; - // Force input fetch - hasFirstRecord = reader.next(); } else { shuffleInput = true; shuffleReader = (KeyValuesReader) r; @@ -121,22 +118,10 @@ public class POValueInputTez extends Phy } hasNext = shuffleReader.next(); } - } else { - if (hasFirstRecord != null) { - if (hasFirstRecord) { - hasFirstRecord = null; - Tuple origTuple = (Tuple) reader.getCurrentValue(); - Tuple copy = mTupleFactory.newTuple(origTuple.getAll()); - return new Result(POStatus.STATUS_OK, copy); - } - hasFirstRecord = null; - } else { - while (reader.next()) { - Tuple origTuple = (Tuple) reader.getCurrentValue(); - Tuple copy = mTupleFactory.newTuple(origTuple.getAll()); - return new Result(POStatus.STATUS_OK, copy); - } - } + } else if (reader.next()) { + Tuple origTuple = (Tuple) reader.getCurrentValue(); + Tuple copy = mTupleFactory.newTuple(origTuple.getAll()); + return new Result(POStatus.STATUS_OK, copy); } finished = true; // For certain operators (such as STREAM), we could still have some work Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/CombinerOptimizer.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/CombinerOptimizer.java?rev=1784224&r1=1784223&r2=1784224&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/CombinerOptimizer.java (original) +++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/CombinerOptimizer.java Fri Feb 24 03:34:37 2017 @@ -69,11 +69,6 @@ public class CombinerOptimizer extends T } for (TezOperator from : predecessors) { - PhysicalPlan combinePlan = to.inEdges.get(from.getOperatorKey()).combinePlan; - if (!combinePlan.isEmpty()) { - // Cases like bloom join have combine plan already set - continue; - } List<POLocalRearrangeTez> rearranges = PlanHelper.getPhysicalOperators(from.plan, POLocalRearrangeTez.class); if (rearranges.isEmpty()) { continue; @@ -82,7 +77,7 @@ public class CombinerOptimizer extends T POLocalRearrangeTez connectingLR = null; PhysicalPlan rearrangePlan = from.plan; for (POLocalRearrangeTez lr : rearranges) { - if (lr.containsOutputKey(to.getOperatorKey().toString())) { + if (lr.getOutputKey().equals(to.getOperatorKey().toString())) { connectingLR = lr; break; } @@ -95,6 +90,7 @@ public class CombinerOptimizer extends T // Detected the POLocalRearrange -> POPackage pattern. Let's add // combiner if possible. + PhysicalPlan combinePlan = to.inEdges.get(from.getOperatorKey()).combinePlan; CombinerOptimizerUtil.addCombiner(rearrangePlan, to.plan, combinePlan, messageCollector, doMapAgg); if(!combinePlan.isEmpty()) { Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/LoaderProcessor.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/LoaderProcessor.java?rev=1784224&r1=1784223&r2=1784224&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/LoaderProcessor.java (original) +++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/LoaderProcessor.java Fri Feb 24 03:34:37 2017 @@ -30,7 +30,6 @@ import org.apache.hadoop.mapreduce.Job; import org.apache.pig.LoadFunc; import org.apache.pig.PigConfiguration; import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil; -import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.InputSizeReducerEstimator; import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigInputFormat; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLoad; @@ -66,6 +65,11 @@ public class LoaderProcessor extends Tez this.jobConf.setBoolean("mapred.mapper.new-api", true); this.jobConf.setClass("mapreduce.inputformat.class", PigInputFormat.class, InputFormat.class); + try { + this.jobConf.set("pig.pigContext", ObjectSerializer.serialize(pc)); + } catch (IOException e) { + throw new VisitorException(e); + } } /** @@ -171,7 +175,6 @@ public class LoaderProcessor extends Tez // splits can be moved to if(loads) block below int parallelism = tezOp.getLoaderInfo().getInputSplitInfo().getNumTasks(); tezOp.setRequestedParallelism(parallelism); - tezOp.setTotalInputFilesSize(InputSizeReducerEstimator.getTotalInputFileSize(conf, lds, job)); } return lds; } Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/MultiQueryOptimizerTez.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/MultiQueryOptimizerTez.java?rev=1784224&r1=1784223&r2=1784224&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/MultiQueryOptimizerTez.java (original) +++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/MultiQueryOptimizerTez.java Fri Feb 24 03:34:37 2017 @@ -153,8 +153,6 @@ public class MultiQueryOptimizerTez exte } } if (getPlan().getSuccessors(successor) != null) { - nonPackageInputSuccessors.clear(); - toMergeSuccessors.clear(); for (TezOperator succSuccessor : getPlan().getSuccessors(successor)) { if (succSuccessor.isUnion()) { if (!(unionOptimizerOn && @@ -173,13 +171,7 @@ public class MultiQueryOptimizerTez exte continue; } } - if (TezCompilerUtil.isNonPackageInput(successor.getOperatorKey().toString(), succSuccessor)) { - // Output goes to scalar or POFRJoinTez in the union operator - // We need to ensure it is the only one to avoid parallel edges - canMerge = canMerge ? nonPackageInputSuccessors.add(succSuccessor) : false; - } else { - toMergeSuccessors.add(succSuccessor); - } + toMergeSuccessors.add(succSuccessor); List<TezOperator> unionSuccessors = getPlan().getSuccessors(succSuccessor); if (unionSuccessors != null) { for (TezOperator unionSuccessor : unionSuccessors) { Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/ParallelismSetter.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/ParallelismSetter.java?rev=1784224&r1=1784223&r2=1784224&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/ParallelismSetter.java (original) +++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/ParallelismSetter.java Fri Feb 24 03:34:37 2017 @@ -115,16 +115,11 @@ public class ParallelismSetter extends T } else if (pc.defaultParallel != -1) { parallelism = pc.defaultParallel; } - if (parallelism == 0) { - // We need to produce empty output file. - // Even if user set PARALLEL 0, mapreduce has 1 reducer - parallelism = 1; - } boolean overrideRequestedParallelism = false; if (parallelism != -1 && autoParallelismEnabled - && !tezOp.isDontEstimateParallelism() && tezOp.isIntermediateReducer() + && !tezOp.isDontEstimateParallelism() && tezOp.isOverrideIntermediateParallelism()) { overrideRequestedParallelism = true; } Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/SecondaryKeyOptimizerTez.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/SecondaryKeyOptimizerTez.java?rev=1784224&r1=1784223&r2=1784224&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/SecondaryKeyOptimizerTez.java (original) +++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/SecondaryKeyOptimizerTez.java Fri Feb 24 03:34:37 2017 @@ -75,7 +75,7 @@ public class SecondaryKeyOptimizerTez ex POLocalRearrangeTez connectingLR = null; PhysicalPlan rearrangePlan = from.plan; for (POLocalRearrangeTez lr : rearranges) { - if (lr.containsOutputKey(to.getOperatorKey().toString())) { + if (lr.getOutputKey().equals(to.getOperatorKey().toString())) { connectingLR = lr; break; } Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/TezEstimatedParallelismClearer.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/TezEstimatedParallelismClearer.java?rev=1784224&r1=1784223&r2=1784224&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/TezEstimatedParallelismClearer.java (original) +++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/TezEstimatedParallelismClearer.java Fri Feb 24 03:34:37 2017 @@ -30,8 +30,6 @@ public class TezEstimatedParallelismClea @Override public void visitTezOp(TezOperator tezOp) throws VisitorException { - if (!tezOp.isDontEstimateParallelism()) { - tezOp.setEstimatedParallelism(-1); - } + tezOp.setEstimatedParallelism(-1); } }
