Author: rohini Date: Wed Oct 3 22:39:09 2018 New Revision: 1842768 URL: http://svn.apache.org/viewvc?rev=1842768&view=rev Log: PIG-5342: Add setting to turn off bloom join combiner (satishsaley via rohini)
Modified: pig/trunk/CHANGES.txt pig/trunk/src/docs/src/documentation/content/xdocs/perf.xml pig/trunk/src/docs/src/documentation/content/xdocs/udf.xml pig/trunk/src/org/apache/pig/PigConfiguration.java pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/plans/EndOfAllInputSetter.java pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezCompiler.java pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/BloomPackager.java pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POBuildBloomRearrangeTez.java pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POShuffleTezLoad.java pig/trunk/test/e2e/pig/tests/join.conf pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-BloomJoin-1-KeyToReducer.gld pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-BloomJoin-2-KeyToReducer.gld pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-BloomJoin-3-KeyToReducer.gld pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-BloomJoin-4-KeyToReducer.gld pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-BloomJoin-5-KeyToReducer.gld pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-BloomJoin-6-KeyToReducer.gld pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-BloomJoin-7-KeyToReducer.gld Modified: pig/trunk/CHANGES.txt URL: http://svn.apache.org/viewvc/pig/trunk/CHANGES.txt?rev=1842768&r1=1842767&r2=1842768&view=diff ============================================================================== --- pig/trunk/CHANGES.txt (original) +++ pig/trunk/CHANGES.txt Wed Oct 3 22:39:09 2018 @@ -26,6 +26,8 @@ PIG-5282: Upgade to Java 8 (satishsaley IMPROVEMENTS +PIG-5342: Add setting to turn off bloom join combiner (satishsaley via rohini) + PIG-5349: Log stderr output when shell command fail (knoguchi) PIG-3038: Support for Credentials for UDF,Loader and Storer (satishsaley via rohini) Modified: pig/trunk/src/docs/src/documentation/content/xdocs/perf.xml URL: http://svn.apache.org/viewvc/pig/trunk/src/docs/src/documentation/content/xdocs/perf.xml?rev=1842768&r1=1842767&r2=1842768&view=diff ============================================================================== --- pig/trunk/src/docs/src/documentation/content/xdocs/perf.xml (original) +++ pig/trunk/src/docs/src/documentation/content/xdocs/perf.xml Wed Oct 3 22:39:09 2018 @@ -1210,7 +1210,8 @@ gets 1 GB of memory. Please share your o used to filter records of the other relations before doing a regular hash join. The amount of data sent to the reducers will be a lot less depending up on the numbers of records that are filtered on the map side. Bloom join is very useful in cases where the number of matching records between relations in a join are comparatively less -compared to the total records allowing many to be filtered before the join. +compared to the total records allowing many to be filtered before the join. Bloom join is also ideal in cases of right outer join +with smaller dataset on the right which is not supported by replicated join. Before bloom join was added as a type of join, same functionality was achieved by users by using the <a href="func.html#bloom">builtin bloom udfs</a> which is not as efficient and required more lines of code as well. Currently bloom join is only implemented in Tez execution mode. Builtin bloom udfs have to be used for other execution modes.</p> @@ -1287,9 +1288,10 @@ In this case size of keys sent to the re <li>pig.bloomjoin.num.filters - The number of bloom filters that will be created. Default is 1 for map strategy and 11 for reduce strategy.</li> <li>pig.bloomjoin.vectorsize.bytes - The size in bytes of the bit vector to be used for the bloom filter. A bigger vector size will be needed when the number of distinct keys is higher. Default value is 1048576 (1MB).</li> -<li>pig.bloomjoin.hash.functions - The type of hash function to use. Valid values are 'jenkins' and 'murmur'. Default is murmur.</li> -<li>pig.bloomjoin.hash.types - The number of hash functions to be used in bloom computation. It determines the probability of false positives. +<li>pig.bloomjoin.hash.type - The type of hash function to use. Valid values are 'jenkins' and 'murmur'. Default is murmur.</li> +<li>pig.bloomjoin.hash.functions - The number of hash functions to be used in bloom computation. It determines the probability of false positives. Higher the number lower the false positives. Too high a value can increase the cpu time. Default value is 3.</li> +<li>pig.bloomjoin.nocombiner - To turn off combiner when most of the keys are unique. Default is false.</li> </ul> </section> Modified: pig/trunk/src/docs/src/documentation/content/xdocs/udf.xml URL: http://svn.apache.org/viewvc/pig/trunk/src/docs/src/documentation/content/xdocs/udf.xml?rev=1842768&r1=1842767&r2=1842768&view=diff ============================================================================== --- pig/trunk/src/docs/src/documentation/content/xdocs/udf.xml (original) +++ pig/trunk/src/docs/src/documentation/content/xdocs/udf.xml Wed Oct 3 22:39:09 2018 @@ -186,7 +186,7 @@ public class COUNT extends EvalFunc<L public Tuple exec(Tuple input) throws IOException {return TupleFactory.getInstance().newTuple(sum(input));} } static public class Final extends EvalFunc<Long> { - public Tuple exec(Tuple input) throws IOException {return sum(input);} + public Long exec(Tuple input) throws IOException {return sum(input);} } static protected Long count(Tuple input) throws ExecException { Object values = input.get(0); Modified: pig/trunk/src/org/apache/pig/PigConfiguration.java URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/PigConfiguration.java?rev=1842768&r1=1842767&r2=1842768&view=diff ============================================================================== --- pig/trunk/src/org/apache/pig/PigConfiguration.java (original) +++ pig/trunk/src/org/apache/pig/PigConfiguration.java Wed Oct 3 22:39:09 2018 @@ -204,6 +204,10 @@ public class PigConfiguration { public static final String PIG_BLOOMJOIN_HASH_FUNCTIONS = "pig.bloomjoin.hash.functions"; /** + * To turn off combiner when most of the keys are unique. + */ + public static final String PIG_BLOOMJOIN_NOCOMBINER = "pig.bloomjoin.nocombiner"; + /** * This key used to control the maximum size loaded into * the distributed cache when doing fragment-replicated join */ Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/plans/EndOfAllInputSetter.java URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/plans/EndOfAllInputSetter.java?rev=1842768&r1=1842767&r2=1842768&view=diff ============================================================================== --- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/plans/EndOfAllInputSetter.java (original) +++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/plans/EndOfAllInputSetter.java Wed Oct 3 22:39:09 2018 @@ -26,10 +26,12 @@ import org.apache.pig.backend.hadoop.exe import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLocalRearrange; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POMergeCogroup; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POMergeJoin; +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPackage; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPartialAgg; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPoissonSample; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POReservoirSample; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStream; +import org.apache.pig.backend.hadoop.executionengine.tez.plan.operator.BloomPackager; import org.apache.pig.backend.hadoop.executionengine.tez.plan.operator.POBuildBloomRearrangeTez; import org.apache.pig.impl.plan.DepthFirstWalker; import org.apache.pig.impl.plan.VisitorException; @@ -132,6 +134,12 @@ public class EndOfAllInputSetter extends super.visitLocalRearrange(lr); } + @Override + public void visitPackage(POPackage pkg) throws VisitorException { + if (pkg.getPkgr() instanceof BloomPackager) { + endOfAllInputFlag = true; + } + } /** * @return if end of all input is present */ Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezCompiler.java URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezCompiler.java?rev=1842768&r1=1842767&r2=1842768&view=diff ============================================================================== --- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezCompiler.java (original) +++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezCompiler.java Wed Oct 3 22:39:09 2018 @@ -49,6 +49,7 @@ import org.apache.pig.backend.executione import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil; import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.JobControlCompiler.PigTupleWritableComparator; import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MergeJoinIndexer; +import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigSecondaryKeyComparator; import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigWritableComparators; import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.ScalarPhyFinder; import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.UDFFinder; @@ -109,6 +110,7 @@ import org.apache.pig.backend.hadoop.exe import org.apache.pig.backend.hadoop.executionengine.tez.plan.udf.IsFirstReduceOfKeyTez; import org.apache.pig.backend.hadoop.executionengine.tez.plan.udf.PartitionSkewedKeysTez; import org.apache.pig.backend.hadoop.executionengine.tez.plan.udf.ReadScalarsTez; +import org.apache.pig.backend.hadoop.executionengine.tez.runtime.BloomFilterPartitioner; import org.apache.pig.backend.hadoop.executionengine.tez.runtime.SkewedPartitionerTez; import org.apache.pig.backend.hadoop.executionengine.tez.runtime.WeightedRangePartitionerTez; import org.apache.pig.backend.hadoop.executionengine.tez.util.TezCompilerUtil; @@ -119,6 +121,7 @@ import org.apache.pig.impl.builtin.Parti import org.apache.pig.impl.builtin.TezIndexableLoader; import org.apache.pig.impl.io.FileLocalizer; import org.apache.pig.impl.io.FileSpec; +import org.apache.pig.impl.io.NullableBytesWritable; import org.apache.pig.impl.io.NullableIntWritable; import org.apache.pig.impl.plan.DepthFirstWalker; import org.apache.pig.impl.plan.NodeIdGenerator; @@ -134,6 +137,7 @@ import org.apache.pig.impl.util.Pair; import org.apache.pig.impl.util.Utils; import org.apache.pig.newplan.logical.relational.LOJoin; import org.apache.tez.dag.api.EdgeProperty.DataMovementType; +import org.apache.tez.runtime.library.api.TezRuntimeConfiguration; import org.apache.tez.runtime.library.input.OrderedGroupedKVInput; import org.apache.tez.runtime.library.input.UnorderedKVInput; import org.apache.tez.runtime.library.output.UnorderedKVOutput; @@ -1452,46 +1456,45 @@ public class TezCompiler extends PhyPlan POPackage pkg = new POPackage(OperatorKey.genOpKey(scope)); pkg.setNumInps(1); - BloomPackager pkgr = new BloomPackager(createBloomInMap, vectorSizeBytes, numHash, hashType);; - pkgr.setKeyType(DataType.INTEGER); + BloomPackager pkgr = new BloomPackager(createBloomInMap, numBloomFilters, vectorSizeBytes, numHash, hashType); pkg.setPkgr(pkgr); POValueOutputTez combineBloomOutput = new POValueOutputTez(OperatorKey.genOpKey(scope)); combineBloomOp.plan.addAsLeaf(pkg); combineBloomOp.plan.addAsLeaf(combineBloomOutput); - edge.setIntermediateOutputKeyClass(NullableIntWritable.class.getName()); - edge.setIntermediateOutputKeyComparatorClass(PigWritableComparators.PigIntRawBytesComparator.class.getName()); - - // Add combiner as well. - POPackage pkg_c = new POPackage(OperatorKey.genOpKey(scope)); - BloomPackager combinerPkgr = new BloomPackager(createBloomInMap, vectorSizeBytes, numHash, hashType); - combinerPkgr.setCombiner(true); - combinerPkgr.setKeyType(DataType.INTEGER); - pkg_c.setPkgr(combinerPkgr); - pkg_c.setNumInps(1); - edge.combinePlan.addAsLeaf(pkg_c); - POProject prjKey = new POProject(OperatorKey.genOpKey(scope)); - prjKey.setResultType(DataType.INTEGER); - List<PhysicalPlan> clrInps = new ArrayList<PhysicalPlan>(); - PhysicalPlan pp = new PhysicalPlan(); - pp.add(prjKey); - clrInps.add(pp); - POLocalRearrangeTez clr = localRearrangeFactory.create(0, LocalRearrangeType.WITHPLAN, clrInps, DataType.INTEGER); - clr.setOutputKey(combineBloomOpKey); - edge.combinePlan.addAsLeaf(clr); - if (createBloomInMap) { + pkgr.setKeyType(DataType.INTEGER); + edge.setIntermediateOutputKeyClass(NullableIntWritable.class.getName()); + edge.setIntermediateOutputKeyComparatorClass( + PigWritableComparators.PigIntRawBytesComparator.class.getName()); + // Add combiner as well. Each of the bloom filter is 1 MB by default. When there are + // 100s of mappers producing bloom filter, it is better to have combiner + // on the reduce side. + POPackage pkg_c = new POPackage(OperatorKey.genOpKey(scope)); + pkg_c.setPkgr(new BloomPackager(createBloomInMap, numBloomFilters, vectorSizeBytes, numHash, hashType)); + pkg_c.getPkgr().setKeyType(DataType.INTEGER); + pkg_c.setNumInps(1); + edge.combinePlan.addAsLeaf(pkg_c); + POProject prjKey = new POProject(OperatorKey.genOpKey(scope)); + prjKey.setResultType(DataType.INTEGER); + List<PhysicalPlan> clrInps = new ArrayList<PhysicalPlan>(); + PhysicalPlan pp = new PhysicalPlan(); + pp.add(prjKey); + clrInps.add(pp); + POLocalRearrangeTez clr = localRearrangeFactory.create(0, LocalRearrangeType.WITHPLAN, clrInps, DataType.INTEGER); + clr.setOutputKey(combineBloomOpKey); + edge.combinePlan.addAsLeaf(clr); // No combiner needed on map as there will be only one bloom filter per map for each partition // In the reducer, the bloom filters will be combined with same logic of reduce in BloomPackager edge.setCombinerInMap(false); edge.setCombinerInReducer(true); } else { - pkgr.setBloomKeyType(op.getPkgr().getKeyType()); + pkgr.setKeyType(DataType.BYTEARRAY); + edge.setIntermediateOutputKeyClass(NullableBytesWritable.class.getName()); + edge.setIntermediateOutputKeyComparatorClass(PigWritableComparators.PigBytesRawBytesComparator.class.getName()); + edge.partitionerClass = BloomFilterPartitioner.class; // Do distinct of the keys on the map side to reduce data sent to reducers. - // In case of reduce, not adding a combiner and doing the distinct during reduce itself. - // If needed one can be added later - edge.setCombinerInMap(true); - edge.setCombinerInReducer(false); + edge.setNeedsDistinctCombiner(!conf.getBoolean(PigConfiguration.PIG_BLOOMJOIN_NOCOMBINER, false)); } // Broadcast the final bloom filter to other inputs Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/BloomPackager.java URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/BloomPackager.java?rev=1842768&r1=1842767&r2=1842768&view=diff ============================================================================== --- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/BloomPackager.java (original) +++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/BloomPackager.java Wed Oct 3 22:39:09 2018 @@ -20,7 +20,6 @@ package org.apache.pig.backend.hadoop.ex import java.io.ByteArrayOutputStream; import java.io.DataOutputStream; import java.io.IOException; -import java.util.HashSet; import java.util.Iterator; import org.apache.hadoop.util.bloom.BloomFilter; @@ -38,32 +37,27 @@ import org.apache.pig.data.Tuple; public class BloomPackager extends Packager { private static final long serialVersionUID = 1L; + private static final Result RESULT_EMPTY = new Result(POStatus.STATUS_NULL, null); + private static final Result RESULT_EOP = new Result(POStatus.STATUS_EOP, null); private boolean bloomCreatedInMap; private int vectorSizeBytes; + private int numBloomFilters; private int numHash; private int hashType; - private byte bloomKeyType; - private boolean isCombiner; private transient ByteArrayOutputStream baos; - private transient Iterator<Object> distinctKeyIter; + private transient BloomFilter[] bloomFilters; + private transient int nextFilterIdx; - public BloomPackager(boolean bloomCreatedInMap, int vectorSizeBytes, + public BloomPackager(boolean bloomCreatedInMap, int numBloomFilters, int vectorSizeBytes, int numHash, int hashType) { super(); this.bloomCreatedInMap = bloomCreatedInMap; this.vectorSizeBytes = vectorSizeBytes; this.numHash = numHash; this.hashType = hashType; - } - - public void setBloomKeyType(byte keyType) { - bloomKeyType = keyType; - } - - public void setCombiner(boolean isCombiner) { - this.isCombiner = isCombiner; + this.numBloomFilters = numBloomFilters; } @Override @@ -78,21 +72,26 @@ public class BloomPackager extends Packa @Override public Result getNext() throws ExecException { try { + if (bags == null) { + return new Result(POStatus.STATUS_EOP, null); + } if (bloomCreatedInMap) { - if (bags == null) { - return new Result(POStatus.STATUS_EOP, null); - } - // Same function for combiner and reducer return combineBloomFilters(); } else { - if (isCombiner) { - return getDistinctBloomKeys(); - } else { - if (bags == null) { - return new Result(POStatus.STATUS_EOP, null); - } - return createBloomFilter(); + if (parent.isEndOfAllInput()) { + return retrieveBloomFilter(); } + if (!bags[0].iterator().hasNext()) { + return new Result(POStatus.STATUS_EOP, null); + } + if (bloomFilters == null) { // init + bloomFilters = new BloomFilter[numBloomFilters]; + } + // Create the bloom filters from the keys + Tuple tup = bags[0].iterator().next(); + addKeyToBloomFilter(key, (int) tup.get(0)); + detachInput(); + return RESULT_EMPTY; } } catch (IOException e) { throw new ExecException("Error while constructing final bloom filter", e); @@ -116,28 +115,6 @@ public class BloomPackager extends Packa return getSerializedBloomFilter(partition, bloomFilter, bloomBytes.get().length); } - private Result createBloomFilter() throws IOException { - // We get a bag of keys. Create a bloom filter from them - // First do distinct of the keys. Not using DistinctBag as memory should not be a problem. - HashSet<Object> bloomKeys = new HashSet<>(); - Iterator<Tuple> iter = bags[0].iterator(); - while (iter.hasNext()) { - bloomKeys.add(iter.next().get(0)); - } - - Object partition = key; - detachInput(); // Free up the key and bags reference - - BloomFilter bloomFilter = new BloomFilter(vectorSizeBytes * 8, numHash, hashType); - for (Object bloomKey: bloomKeys) { - Key k = new Key(DataType.toBytes(bloomKey, bloomKeyType)); - bloomFilter.add(k); - } - bloomKeys = null; - return getSerializedBloomFilter(partition, bloomFilter, vectorSizeBytes + 64); - - } - private Result getSerializedBloomFilter(Object partition, BloomFilter bloomFilter, int serializedSize) throws ExecException, IOException { @@ -159,26 +136,28 @@ public class BloomPackager extends Packa return r; } - private Result getDistinctBloomKeys() throws ExecException { - if (distinctKeyIter == null) { - HashSet<Object> bloomKeys = new HashSet<>(); - Iterator<Tuple> iter = bags[0].iterator(); - while (iter.hasNext()) { - bloomKeys.add(iter.next().get(0)); + private void addKeyToBloomFilter(Object key, int partition) throws ExecException { + Key k = new Key(((DataByteArray)key).get()); + BloomFilter filter = bloomFilters[partition]; + if (filter == null) { + filter = new BloomFilter(vectorSizeBytes * 8, numHash, hashType); + bloomFilters[partition] = filter; + } + filter.add(k); + } + + private Result retrieveBloomFilter() throws IOException { + while (nextFilterIdx < numBloomFilters) { + if (bloomFilters[nextFilterIdx] != null) { + return getSerializedBloomFilter(nextFilterIdx, bloomFilters[nextFilterIdx++], vectorSizeBytes + 64); + } else { + nextFilterIdx++; } - distinctKeyIter = bloomKeys.iterator(); } - while (distinctKeyIter.hasNext()) { - Tuple res = mTupleFactory.newTuple(2); - res.set(0, key); - res.set(1, distinctKeyIter.next()); - - Result r = new Result(); - r.result = res; - r.returnStatus = POStatus.STATUS_OK; - return r; - } - distinctKeyIter = null; - return new Result(POStatus.STATUS_EOP, null); + return RESULT_EOP; + } + + public boolean isBloomCreatedInMap() { + return bloomCreatedInMap; } -} +} \ No newline at end of file Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POBuildBloomRearrangeTez.java URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POBuildBloomRearrangeTez.java?rev=1842768&r1=1842767&r2=1842768&view=diff ============================================================================== --- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POBuildBloomRearrangeTez.java (original) +++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POBuildBloomRearrangeTez.java Wed Oct 3 22:39:09 2018 @@ -37,6 +37,7 @@ import org.apache.pig.classification.Int import org.apache.pig.data.DataByteArray; import org.apache.pig.data.DataType; import org.apache.pig.data.Tuple; +import org.apache.pig.impl.io.NullableBytesWritable; import org.apache.pig.impl.io.NullableIntWritable; import org.apache.pig.impl.io.NullableTuple; import org.apache.pig.impl.io.PigNullableWritable; @@ -75,8 +76,7 @@ public class POBuildBloomRearrangeTez ex private transient BloomFilter[] bloomFilters; private transient KeyValueWriter bloomWriter; private transient PigNullableWritable nullKey; - private transient Tuple bloomValue; - private transient NullableTuple bloomNullableTuple; + private transient NullableTuple[] bloomPartitions; public POBuildBloomRearrangeTez(POLocalRearrangeTez lr, boolean createBloomInMap, int numBloomFilters, int vectorSizeBytes, @@ -142,8 +142,7 @@ public class POBuildBloomRearrangeTez ex throw new ExecException(e); } bloomFilters = new BloomFilter[numBloomFilters]; - bloomValue = mTupleFactory.newTuple(1); - bloomNullableTuple = new NullableTuple(bloomValue); + bloomPartitions = new NullableTuple[numBloomFilters]; } @Override @@ -167,7 +166,7 @@ public class POBuildBloomRearrangeTez ex if (createBloomInMap) { addKeyToBloomFilter(keyObj); } else { - writeJoinKeyForBloom(keyObj); + writeJoinKeyForBloom(keyObj, key); } } else if (skipNullKeys) { // Inner join. So don't bother writing null key @@ -225,21 +224,27 @@ public class POBuildBloomRearrangeTez ex } } - private void writeJoinKeyForBloom(Object key) throws IOException { - int partition = (key.hashCode() & Integer.MAX_VALUE) % numBloomFilters; - bloomValue.set(0, key); - bloomWriter.write(new NullableIntWritable(partition), bloomNullableTuple); + private void writeJoinKeyForBloom(Object keyObj, PigNullableWritable key) throws IOException { + int partition = (keyObj.hashCode() & Integer.MAX_VALUE) % numBloomFilters; + if (bloomPartitions[partition] == null) { + Tuple tuple = mTupleFactory.newTuple(1); + tuple.set(0, partition); + bloomPartitions[partition] = new NullableTuple(tuple); + } + bloomWriter.write(new NullableBytesWritable(new DataByteArray(DataType.toBytes(keyObj, keyType))), bloomPartitions[partition]); } private void writeBloomFilters() throws IOException { + Tuple tuple = mTupleFactory.newTuple(1); + NullableTuple nTuple = new NullableTuple(tuple); ByteArrayOutputStream baos = new ByteArrayOutputStream(vectorSizeBytes + 64); for (int i = 0; i < bloomFilters.length; i++) { if (bloomFilters[i] != null) { DataOutputStream dos = new DataOutputStream(baos); bloomFilters[i].write(dos); dos.flush(); - bloomValue.set(0, new DataByteArray(baos.toByteArray())); - bloomWriter.write(new NullableIntWritable(i), bloomNullableTuple); + tuple.set(0, new DataByteArray(baos.toByteArray())); + bloomWriter.write(new NullableIntWritable(i), nTuple); baos.reset(); } } Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POShuffleTezLoad.java URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POShuffleTezLoad.java?rev=1842768&r1=1842767&r2=1842768&view=diff ============================================================================== --- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POShuffleTezLoad.java (original) +++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POShuffleTezLoad.java Wed Oct 3 22:39:09 2018 @@ -188,7 +188,7 @@ public class POShuffleTezLoad extends PO if (Boolean.valueOf(conf.get(JobControlCompiler.END_OF_INP_IN_MAP, "false"))) { this.parentPlan.endOfAllInput = true; } - return RESULT_EOP; + return pkgr.getNext(); } key = pkgr.getKey(min); Modified: pig/trunk/test/e2e/pig/tests/join.conf URL: http://svn.apache.org/viewvc/pig/trunk/test/e2e/pig/tests/join.conf?rev=1842768&r1=1842767&r2=1842768&view=diff ============================================================================== --- pig/trunk/test/e2e/pig/tests/join.conf (original) +++ pig/trunk/test/e2e/pig/tests/join.conf Wed Oct 3 22:39:09 2018 @@ -28,7 +28,9 @@ $cfg = { { # Tuple join key 'num' => 1, - 'pig' => q\a = load ':INPATH:/singlefile/studentnulltab10k' as (name, age, gpa); + 'pig' => q\ +SET pig.bloomjoin.num.filters 5; +a = load ':INPATH:/singlefile/studentnulltab10k' as (name, age, gpa); b = load ':INPATH:/singlefile/voternulltab10k' as (name, age, registration, contributions); --c = filter a by age < 20; --d = filter b by age < 20; @@ -171,7 +173,9 @@ store e into ':OUTPATH:.2';\, { # Tuple join key 'num' => 1, - 'pig' => q\a = load ':INPATH:/singlefile/studentnulltab10k' as (name, age, gpa); + 'pig' => q\ +SET pig.bloomjoin.num.filters 5; +a = load ':INPATH:/singlefile/studentnulltab10k' as (name, age, gpa); b = load ':INPATH:/singlefile/voternulltab10k' as (name, age, registration, contributions); --c = filter a by age < 20; --d = filter b by age < 20; Modified: pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-BloomJoin-1-KeyToReducer.gld URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-BloomJoin-1-KeyToReducer.gld?rev=1842768&r1=1842767&r2=1842768&view=diff ============================================================================== --- pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-BloomJoin-1-KeyToReducer.gld (original) +++ pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-BloomJoin-1-KeyToReducer.gld Wed Oct 3 22:39:09 2018 @@ -27,15 +27,11 @@ d: BuildBloom Rearrange[tuple]{bytearray |---c: Load(file:///tmp/input2:org.apache.pig.builtin.PigStorage) - scope-14 Tez vertex scope-50 # Combine plan on edge <scope-48> -Local Rearrange[tuple]{int}(false) - scope-55 -> scope-50 -| | -| Project[int][0] - scope-54 -| -|---Package(BloomPackager)[tuple]{int} - scope-53 +org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.DistinctCombiner$Combine # Plan on vertex POValueOutputTez - scope-52 -> [scope-46, scope-47] | -|---Package(BloomPackager)[tuple]{int} - scope-51 +|---Package(BloomPackager)[tuple]{bytearray} - scope-51 Tez vertex scope-46 # Plan on vertex d: BloomFilter Rearrange[tuple]{bytearray}(false) - scope-26 <- scope-50 -> scope-49 Modified: pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-BloomJoin-2-KeyToReducer.gld URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-BloomJoin-2-KeyToReducer.gld?rev=1842768&r1=1842767&r2=1842768&view=diff ============================================================================== --- pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-BloomJoin-2-KeyToReducer.gld (original) +++ pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-BloomJoin-2-KeyToReducer.gld Wed Oct 3 22:39:09 2018 @@ -28,15 +28,11 @@ d: BuildBloom Rearrange[tuple]{chararray |---a: Load(file:///tmp/input1:org.apache.pig.builtin.PigStorage) - scope-0 Tez vertex scope-42 # Combine plan on edge <scope-39> -Local Rearrange[tuple]{int}(false) - scope-47 -> scope-42 -| | -| Project[int][0] - scope-46 -| -|---Package(BloomPackager)[tuple]{int} - scope-45 +org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.DistinctCombiner$Combine # Plan on vertex POValueOutputTez - scope-44 -> [scope-40] | -|---Package(BloomPackager)[tuple]{int} - scope-43 +|---Package(BloomPackager)[tuple]{bytearray} - scope-43 Tez vertex scope-40 # Plan on vertex d: BloomFilter Rearrange[tuple]{chararray}(false) - scope-22 <- scope-42 -> scope-41 Modified: pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-BloomJoin-3-KeyToReducer.gld URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-BloomJoin-3-KeyToReducer.gld?rev=1842768&r1=1842767&r2=1842768&view=diff ============================================================================== --- pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-BloomJoin-3-KeyToReducer.gld (original) +++ pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-BloomJoin-3-KeyToReducer.gld Wed Oct 3 22:39:09 2018 @@ -4,19 +4,19 @@ #-------------------------------------------------- # TEZ DAG plan: pig-0_scope-0 #-------------------------------------------------- -Tez vertex scope-45 -> Tez vertex group scope-58,Tez vertex group scope-59, -Tez vertex scope-46 -> Tez vertex group scope-58,Tez vertex group scope-59, -Tez vertex group scope-59 -> Tez vertex scope-52, +Tez vertex scope-45 -> Tez vertex group scope-55,Tez vertex group scope-56, +Tez vertex scope-46 -> Tez vertex group scope-55,Tez vertex group scope-56, +Tez vertex group scope-56 -> Tez vertex scope-52, Tez vertex scope-52 -> Tez vertex scope-44, Tez vertex scope-44 -> Tez vertex scope-51, -Tez vertex group scope-58 -> Tez vertex scope-51, +Tez vertex group scope-55 -> Tez vertex scope-51, Tez vertex scope-51 Tez vertex scope-45 # Plan on vertex -d: BuildBloom Rearrange[tuple]{int}(false) - scope-60 -> [ scope-51, scope-52] +d: BuildBloom Rearrange[tuple]{int}(false) - scope-57 -> [ scope-51, scope-52] | | -| Project[int][0] - scope-61 +| Project[int][0] - scope-58 | |---b: New For Each(false,false)[bag] - scope-15 | | @@ -31,9 +31,9 @@ d: BuildBloom Rearrange[tuple]{int}(fals |---b: Load(file:///tmp/input2:org.apache.pig.builtin.PigStorage) - scope-8 Tez vertex scope-46 # Plan on vertex -d: BuildBloom Rearrange[tuple]{int}(false) - scope-62 -> [ scope-51, scope-52] +d: BuildBloom Rearrange[tuple]{int}(false) - scope-59 -> [ scope-51, scope-52] | | -| Project[int][0] - scope-63 +| Project[int][0] - scope-60 | |---c: New For Each(false,false)[bag] - scope-23 | | @@ -46,25 +46,17 @@ d: BuildBloom Rearrange[tuple]{int}(fals | |---Project[bytearray][1] - scope-20 | |---c: Load(file:///tmp/input3:org.apache.pig.builtin.PigStorage) - scope-16 -Tez vertex group scope-59 <- [scope-45, scope-46] -> scope-52 +Tez vertex group scope-56 <- [scope-45, scope-46] -> scope-52 # No plan on vertex group Tez vertex scope-52 # Combine plan on edge <scope-45> -Local Rearrange[tuple]{int}(false) - scope-57 -> scope-52 -| | -| Project[int][0] - scope-56 -| -|---Package(BloomPackager)[tuple]{int} - scope-55 +org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.DistinctCombiner$Combine # Combine plan on edge <scope-46> -Local Rearrange[tuple]{int}(false) - scope-57 -> scope-52 -| | -| Project[int][0] - scope-56 -| -|---Package(BloomPackager)[tuple]{int} - scope-55 +org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.DistinctCombiner$Combine # Plan on vertex POValueOutputTez - scope-54 -> [scope-44] | -|---Package(BloomPackager)[tuple]{int} - scope-53 +|---Package(BloomPackager)[tuple]{bytearray} - scope-53 Tez vertex scope-44 # Plan on vertex d: BloomFilter Rearrange[tuple]{int}(false) - scope-29 <- scope-52 -> scope-51 @@ -82,7 +74,7 @@ d: BloomFilter Rearrange[tuple]{int}(fal | |---Project[bytearray][1] - scope-4 | |---a: Load(file:///tmp/input1:org.apache.pig.builtin.PigStorage) - scope-0 -Tez vertex group scope-58 <- [scope-45, scope-46] -> scope-51 +Tez vertex group scope-55 <- [scope-45, scope-46] -> scope-51 # No plan on vertex group Tez vertex scope-51 # Plan on vertex Modified: pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-BloomJoin-4-KeyToReducer.gld URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-BloomJoin-4-KeyToReducer.gld?rev=1842768&r1=1842767&r2=1842768&view=diff ============================================================================== --- pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-BloomJoin-4-KeyToReducer.gld (original) +++ pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-BloomJoin-4-KeyToReducer.gld Wed Oct 3 22:39:09 2018 @@ -60,15 +60,11 @@ d: BuildBloom Rearrange[tuple]{int}(fals |---a: Load(file:///tmp/input1:org.apache.pig.builtin.PigStorage) - scope-17 Tez vertex scope-52 # Combine plan on edge <scope-50> -Local Rearrange[tuple]{int}(false) - scope-57 -> scope-52 -| | -| Project[int][0] - scope-56 -| -|---Package(BloomPackager)[tuple]{int} - scope-55 +org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.DistinctCombiner$Combine # Plan on vertex POValueOutputTez - scope-54 -> [scope-46] | -|---Package(BloomPackager)[tuple]{int} - scope-53 +|---Package(BloomPackager)[tuple]{bytearray} - scope-53 Tez vertex scope-46 # Plan on vertex d: BloomFilter Rearrange[tuple]{int}(false) - scope-29 <- scope-52 -> scope-51 Modified: pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-BloomJoin-5-KeyToReducer.gld URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-BloomJoin-5-KeyToReducer.gld?rev=1842768&r1=1842767&r2=1842768&view=diff ============================================================================== --- pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-BloomJoin-5-KeyToReducer.gld (original) +++ pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-BloomJoin-5-KeyToReducer.gld Wed Oct 3 22:39:09 2018 @@ -29,18 +29,14 @@ d: BuildBloom Rearrange[tuple]{int}(fals |---b: Load(file:///tmp/input2:org.apache.pig.builtin.PigStorage) - scope-21 Tez vertex scope-62 # Combine plan on edge <scope-60> -Local Rearrange[tuple]{int}(false) - scope-67 -> scope-62 -| | -| Project[int][0] - scope-66 -| -|---Package(BloomPackager)[tuple]{int} - scope-65 +org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.DistinctCombiner$Combine # Plan on vertex POValueOutputTez - scope-64 -> [scope-54, scope-58] | -|---Package(BloomPackager)[tuple]{int} - scope-63 +|---Package(BloomPackager)[tuple]{bytearray} - scope-63 Tez vertex scope-54 # Plan on vertex -a: Split - scope-68 +a: Split - scope-65 | | | d: BloomFilter Rearrange[tuple]{int}(false) - scope-34 <- scope-62 -> scope-61 | | | Modified: pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-BloomJoin-6-KeyToReducer.gld URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-BloomJoin-6-KeyToReducer.gld?rev=1842768&r1=1842767&r2=1842768&view=diff ============================================================================== --- pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-BloomJoin-6-KeyToReducer.gld (original) +++ pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-BloomJoin-6-KeyToReducer.gld Wed Oct 3 22:39:09 2018 @@ -11,7 +11,7 @@ Tez vertex scope-56 Tez vertex scope-49 # Plan on vertex -a: Split - scope-63 +a: Split - scope-60 | | | a2: Store(file:///tmp/pigoutput/a2:org.apache.pig.builtin.PigStorage) - scope-15 | | @@ -48,15 +48,11 @@ a: Split - scope-63 |---a: Load(file:///tmp/input1:org.apache.pig.builtin.PigStorage) - scope-0 Tez vertex scope-57 # Combine plan on edge <scope-49> -Local Rearrange[tuple]{int}(false) - scope-62 -> scope-57 -| | -| Project[int][0] - scope-61 -| -|---Package(BloomPackager)[tuple]{int} - scope-60 +org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.DistinctCombiner$Combine # Plan on vertex POValueOutputTez - scope-59 -> [scope-53] | -|---Package(BloomPackager)[tuple]{int} - scope-58 +|---Package(BloomPackager)[tuple]{bytearray} - scope-58 Tez vertex scope-53 # Plan on vertex d: BloomFilter Rearrange[tuple]{int}(false) - scope-34 <- scope-57 -> scope-56 Modified: pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-BloomJoin-7-KeyToReducer.gld URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-BloomJoin-7-KeyToReducer.gld?rev=1842768&r1=1842767&r2=1842768&view=diff ============================================================================== --- pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-BloomJoin-7-KeyToReducer.gld (original) +++ pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-BloomJoin-7-KeyToReducer.gld Wed Oct 3 22:39:09 2018 @@ -12,7 +12,7 @@ Tez vertex scope-51 Tez vertex scope-43 # Plan on vertex -a: Split - scope-58 +a: Split - scope-55 | | | e: BuildBloom Rearrange[tuple]{int}(false) - scope-36 -> [ scope-51, scope-52] | | | @@ -41,15 +41,11 @@ a: Split - scope-58 |---a: Load(file:///tmp/input1:org.apache.pig.builtin.PigStorage) - scope-0 Tez vertex scope-52 # Combine plan on edge <scope-43> -Local Rearrange[tuple]{int}(false) - scope-57 -> scope-52 -| | -| Project[int][0] - scope-56 -| -|---Package(BloomPackager)[tuple]{int} - scope-55 +org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.DistinctCombiner$Combine # Plan on vertex POValueOutputTez - scope-54 -> [scope-45, scope-47] | -|---Package(BloomPackager)[tuple]{int} - scope-53 +|---Package(BloomPackager)[tuple]{bytearray} - scope-53 Tez vertex scope-45 # Plan on vertex e: BloomFilter Rearrange[tuple]{int}(false) - scope-32 <- scope-52 -> scope-51