Author: rohini
Date: Wed Oct  3 22:39:09 2018
New Revision: 1842768

URL: http://svn.apache.org/viewvc?rev=1842768&view=rev
Log:
PIG-5342: Add setting to turn off bloom join combiner (satishsaley via rohini)

Modified:
    pig/trunk/CHANGES.txt
    pig/trunk/src/docs/src/documentation/content/xdocs/perf.xml
    pig/trunk/src/docs/src/documentation/content/xdocs/udf.xml
    pig/trunk/src/org/apache/pig/PigConfiguration.java
    
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/plans/EndOfAllInputSetter.java
    
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezCompiler.java
    
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/BloomPackager.java
    
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POBuildBloomRearrangeTez.java
    
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POShuffleTezLoad.java
    pig/trunk/test/e2e/pig/tests/join.conf
    
pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-BloomJoin-1-KeyToReducer.gld
    
pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-BloomJoin-2-KeyToReducer.gld
    
pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-BloomJoin-3-KeyToReducer.gld
    
pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-BloomJoin-4-KeyToReducer.gld
    
pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-BloomJoin-5-KeyToReducer.gld
    
pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-BloomJoin-6-KeyToReducer.gld
    
pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-BloomJoin-7-KeyToReducer.gld

Modified: pig/trunk/CHANGES.txt
URL: 
http://svn.apache.org/viewvc/pig/trunk/CHANGES.txt?rev=1842768&r1=1842767&r2=1842768&view=diff
==============================================================================
--- pig/trunk/CHANGES.txt (original)
+++ pig/trunk/CHANGES.txt Wed Oct  3 22:39:09 2018
@@ -26,6 +26,8 @@ PIG-5282: Upgade to Java 8 (satishsaley
  
 IMPROVEMENTS
 
+PIG-5342: Add setting to turn off bloom join combiner (satishsaley via rohini)
+
 PIG-5349: Log stderr output when shell command fail (knoguchi)
 
 PIG-3038: Support for Credentials for UDF,Loader and Storer (satishsaley via 
rohini)

Modified: pig/trunk/src/docs/src/documentation/content/xdocs/perf.xml
URL: 
http://svn.apache.org/viewvc/pig/trunk/src/docs/src/documentation/content/xdocs/perf.xml?rev=1842768&r1=1842767&r2=1842768&view=diff
==============================================================================
--- pig/trunk/src/docs/src/documentation/content/xdocs/perf.xml (original)
+++ pig/trunk/src/docs/src/documentation/content/xdocs/perf.xml Wed Oct  3 
22:39:09 2018
@@ -1210,7 +1210,8 @@ gets 1 GB of memory. Please share your o
 used to filter records of the other relations before doing a regular hash join.
 The amount of data sent to the reducers will be a lot less depending up on the 
numbers of records that are filtered on the map side.
 Bloom join is very useful in cases where the number of matching records 
between relations in a join are comparatively less
-compared to the total records allowing many to be filtered before the join.
+compared to the total records allowing many to be filtered before the join. 
Bloom join is also ideal in cases of right outer join
+with smaller dataset on the right which is not supported by replicated join.
 Before bloom join was added as a type of join, same functionality was achieved 
by users by using
 the <a href="func.html#bloom">builtin bloom udfs</a> which is not as efficient 
and required more lines of code as well.
 Currently bloom join is only implemented in Tez execution mode. Builtin bloom 
udfs have to be used for other execution modes.</p>
@@ -1287,9 +1288,10 @@ In this case size of keys sent to the re
 <li>pig.bloomjoin.num.filters - The number of bloom filters that will be 
created. Default is 1 for map strategy and 11 for reduce strategy.</li>
 <li>pig.bloomjoin.vectorsize.bytes - The size in bytes of the bit vector to be 
used for the bloom filter.
 A bigger vector size will be needed when the number of distinct keys is 
higher. Default value is 1048576 (1MB).</li>
-<li>pig.bloomjoin.hash.functions - The type of hash function to use. Valid 
values are 'jenkins' and 'murmur'. Default is murmur.</li>
-<li>pig.bloomjoin.hash.types - The number of hash functions to be used in 
bloom computation. It determines the probability of false positives.
+<li>pig.bloomjoin.hash.type - The type of hash function to use. Valid values 
are 'jenkins' and 'murmur'. Default is murmur.</li>
+<li>pig.bloomjoin.hash.functions - The number of hash functions to be used in 
bloom computation. It determines the probability of false positives.
 Higher the number lower the false positives. Too high a value can increase the 
cpu time. Default value is 3.</li>
+<li>pig.bloomjoin.nocombiner - To turn off combiner when most of the keys are 
unique. Default is false.</li>
 </ul>
 </section>
 

Modified: pig/trunk/src/docs/src/documentation/content/xdocs/udf.xml
URL: 
http://svn.apache.org/viewvc/pig/trunk/src/docs/src/documentation/content/xdocs/udf.xml?rev=1842768&r1=1842767&r2=1842768&view=diff
==============================================================================
--- pig/trunk/src/docs/src/documentation/content/xdocs/udf.xml (original)
+++ pig/trunk/src/docs/src/documentation/content/xdocs/udf.xml Wed Oct  3 
22:39:09 2018
@@ -186,7 +186,7 @@ public class COUNT extends EvalFunc&lt;L
         public Tuple exec(Tuple input) throws IOException {return 
TupleFactory.getInstance().newTuple(sum(input));}
     }
     static public class Final extends EvalFunc&lt;Long&gt; {
-        public Tuple exec(Tuple input) throws IOException {return sum(input);}
+        public Long exec(Tuple input) throws IOException {return sum(input);}
     }
     static protected Long count(Tuple input) throws ExecException {
         Object values = input.get(0);

Modified: pig/trunk/src/org/apache/pig/PigConfiguration.java
URL: 
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/PigConfiguration.java?rev=1842768&r1=1842767&r2=1842768&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/PigConfiguration.java (original)
+++ pig/trunk/src/org/apache/pig/PigConfiguration.java Wed Oct  3 22:39:09 2018
@@ -204,6 +204,10 @@ public class PigConfiguration {
     public static final String PIG_BLOOMJOIN_HASH_FUNCTIONS = 
"pig.bloomjoin.hash.functions";
 
     /**
+     * To turn off combiner when most of the keys are unique.
+     */
+    public static final String PIG_BLOOMJOIN_NOCOMBINER = 
"pig.bloomjoin.nocombiner";
+    /**
      * This key used to control the maximum size loaded into
      * the distributed cache when doing fragment-replicated join
      */

Modified: 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/plans/EndOfAllInputSetter.java
URL: 
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/plans/EndOfAllInputSetter.java?rev=1842768&r1=1842767&r2=1842768&view=diff
==============================================================================
--- 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/plans/EndOfAllInputSetter.java
 (original)
+++ 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/plans/EndOfAllInputSetter.java
 Wed Oct  3 22:39:09 2018
@@ -26,10 +26,12 @@ import org.apache.pig.backend.hadoop.exe
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLocalRearrange;
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POMergeCogroup;
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POMergeJoin;
+import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPackage;
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPartialAgg;
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPoissonSample;
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POReservoirSample;
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStream;
+import 
org.apache.pig.backend.hadoop.executionengine.tez.plan.operator.BloomPackager;
 import 
org.apache.pig.backend.hadoop.executionengine.tez.plan.operator.POBuildBloomRearrangeTez;
 import org.apache.pig.impl.plan.DepthFirstWalker;
 import org.apache.pig.impl.plan.VisitorException;
@@ -132,6 +134,12 @@ public class EndOfAllInputSetter extends
             super.visitLocalRearrange(lr);
         }
 
+        @Override
+        public void visitPackage(POPackage pkg) throws VisitorException {
+            if (pkg.getPkgr() instanceof BloomPackager) {
+                endOfAllInputFlag = true;
+            }
+        }
         /**
          * @return if end of all input is present
          */

Modified: 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezCompiler.java
URL: 
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezCompiler.java?rev=1842768&r1=1842767&r2=1842768&view=diff
==============================================================================
--- 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezCompiler.java
 (original)
+++ 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezCompiler.java
 Wed Oct  3 22:39:09 2018
@@ -49,6 +49,7 @@ import org.apache.pig.backend.executione
 import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil;
 import 
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.JobControlCompiler.PigTupleWritableComparator;
 import 
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MergeJoinIndexer;
+import 
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigSecondaryKeyComparator;
 import 
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigWritableComparators;
 import 
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.ScalarPhyFinder;
 import 
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.UDFFinder;
@@ -109,6 +110,7 @@ import org.apache.pig.backend.hadoop.exe
 import 
org.apache.pig.backend.hadoop.executionengine.tez.plan.udf.IsFirstReduceOfKeyTez;
 import 
org.apache.pig.backend.hadoop.executionengine.tez.plan.udf.PartitionSkewedKeysTez;
 import 
org.apache.pig.backend.hadoop.executionengine.tez.plan.udf.ReadScalarsTez;
+import 
org.apache.pig.backend.hadoop.executionengine.tez.runtime.BloomFilterPartitioner;
 import 
org.apache.pig.backend.hadoop.executionengine.tez.runtime.SkewedPartitionerTez;
 import 
org.apache.pig.backend.hadoop.executionengine.tez.runtime.WeightedRangePartitionerTez;
 import org.apache.pig.backend.hadoop.executionengine.tez.util.TezCompilerUtil;
@@ -119,6 +121,7 @@ import org.apache.pig.impl.builtin.Parti
 import org.apache.pig.impl.builtin.TezIndexableLoader;
 import org.apache.pig.impl.io.FileLocalizer;
 import org.apache.pig.impl.io.FileSpec;
+import org.apache.pig.impl.io.NullableBytesWritable;
 import org.apache.pig.impl.io.NullableIntWritable;
 import org.apache.pig.impl.plan.DepthFirstWalker;
 import org.apache.pig.impl.plan.NodeIdGenerator;
@@ -134,6 +137,7 @@ import org.apache.pig.impl.util.Pair;
 import org.apache.pig.impl.util.Utils;
 import org.apache.pig.newplan.logical.relational.LOJoin;
 import org.apache.tez.dag.api.EdgeProperty.DataMovementType;
+import org.apache.tez.runtime.library.api.TezRuntimeConfiguration;
 import org.apache.tez.runtime.library.input.OrderedGroupedKVInput;
 import org.apache.tez.runtime.library.input.UnorderedKVInput;
 import org.apache.tez.runtime.library.output.UnorderedKVOutput;
@@ -1452,46 +1456,45 @@ public class TezCompiler extends PhyPlan
 
         POPackage pkg = new POPackage(OperatorKey.genOpKey(scope));
         pkg.setNumInps(1);
-        BloomPackager pkgr = new BloomPackager(createBloomInMap, 
vectorSizeBytes, numHash, hashType);;
-        pkgr.setKeyType(DataType.INTEGER);
+        BloomPackager pkgr = new BloomPackager(createBloomInMap, 
numBloomFilters, vectorSizeBytes, numHash, hashType);
         pkg.setPkgr(pkgr);
         POValueOutputTez combineBloomOutput = new 
POValueOutputTez(OperatorKey.genOpKey(scope));
         combineBloomOp.plan.addAsLeaf(pkg);
         combineBloomOp.plan.addAsLeaf(combineBloomOutput);
 
-        
edge.setIntermediateOutputKeyClass(NullableIntWritable.class.getName());
-        
edge.setIntermediateOutputKeyComparatorClass(PigWritableComparators.PigIntRawBytesComparator.class.getName());
-
-        // Add combiner as well.
-        POPackage pkg_c = new POPackage(OperatorKey.genOpKey(scope));
-        BloomPackager combinerPkgr = new BloomPackager(createBloomInMap, 
vectorSizeBytes, numHash, hashType);
-        combinerPkgr.setCombiner(true);
-        combinerPkgr.setKeyType(DataType.INTEGER);
-        pkg_c.setPkgr(combinerPkgr);
-        pkg_c.setNumInps(1);
-        edge.combinePlan.addAsLeaf(pkg_c);
-        POProject prjKey = new POProject(OperatorKey.genOpKey(scope));
-        prjKey.setResultType(DataType.INTEGER);
-        List<PhysicalPlan> clrInps = new ArrayList<PhysicalPlan>();
-        PhysicalPlan pp = new PhysicalPlan();
-        pp.add(prjKey);
-        clrInps.add(pp);
-        POLocalRearrangeTez clr = localRearrangeFactory.create(0, 
LocalRearrangeType.WITHPLAN, clrInps, DataType.INTEGER);
-        clr.setOutputKey(combineBloomOpKey);
-        edge.combinePlan.addAsLeaf(clr);
-
         if (createBloomInMap) {
+            pkgr.setKeyType(DataType.INTEGER);
+            
edge.setIntermediateOutputKeyClass(NullableIntWritable.class.getName());
+            edge.setIntermediateOutputKeyComparatorClass(
+            PigWritableComparators.PigIntRawBytesComparator.class.getName());
+            // Add combiner as well. Each of the bloom filter is 1 MB by 
default. When there are
+            // 100s of mappers producing bloom filter, it is better to have 
combiner
+            // on the reduce side.
+            POPackage pkg_c = new POPackage(OperatorKey.genOpKey(scope));
+            pkg_c.setPkgr(new BloomPackager(createBloomInMap, numBloomFilters, 
vectorSizeBytes, numHash, hashType));
+            pkg_c.getPkgr().setKeyType(DataType.INTEGER);
+            pkg_c.setNumInps(1);
+            edge.combinePlan.addAsLeaf(pkg_c);
+            POProject prjKey = new POProject(OperatorKey.genOpKey(scope));
+            prjKey.setResultType(DataType.INTEGER);
+            List<PhysicalPlan> clrInps = new ArrayList<PhysicalPlan>();
+            PhysicalPlan pp = new PhysicalPlan();
+            pp.add(prjKey);
+            clrInps.add(pp);
+            POLocalRearrangeTez clr = localRearrangeFactory.create(0, 
LocalRearrangeType.WITHPLAN, clrInps, DataType.INTEGER);
+            clr.setOutputKey(combineBloomOpKey);
+            edge.combinePlan.addAsLeaf(clr);
             // No combiner needed on map as there will be only one bloom 
filter per map for each partition
             // In the reducer, the bloom filters will be combined with same 
logic of reduce in BloomPackager
             edge.setCombinerInMap(false);
             edge.setCombinerInReducer(true);
         } else {
-            pkgr.setBloomKeyType(op.getPkgr().getKeyType());
+            pkgr.setKeyType(DataType.BYTEARRAY);
+            
edge.setIntermediateOutputKeyClass(NullableBytesWritable.class.getName());
+            
edge.setIntermediateOutputKeyComparatorClass(PigWritableComparators.PigBytesRawBytesComparator.class.getName());
+            edge.partitionerClass = BloomFilterPartitioner.class;
             // Do distinct of the keys on the map side to reduce data sent to 
reducers.
-            // In case of reduce, not adding a combiner and doing the distinct 
during reduce itself.
-            // If needed one can be added later
-            edge.setCombinerInMap(true);
-            edge.setCombinerInReducer(false);
+            
edge.setNeedsDistinctCombiner(!conf.getBoolean(PigConfiguration.PIG_BLOOMJOIN_NOCOMBINER,
 false));
         }
 
         // Broadcast the final bloom filter to other inputs

Modified: 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/BloomPackager.java
URL: 
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/BloomPackager.java?rev=1842768&r1=1842767&r2=1842768&view=diff
==============================================================================
--- 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/BloomPackager.java
 (original)
+++ 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/BloomPackager.java
 Wed Oct  3 22:39:09 2018
@@ -20,7 +20,6 @@ package org.apache.pig.backend.hadoop.ex
 import java.io.ByteArrayOutputStream;
 import java.io.DataOutputStream;
 import java.io.IOException;
-import java.util.HashSet;
 import java.util.Iterator;
 
 import org.apache.hadoop.util.bloom.BloomFilter;
@@ -38,32 +37,27 @@ import org.apache.pig.data.Tuple;
 public class BloomPackager extends Packager {
 
     private static final long serialVersionUID = 1L;
+    private static final Result RESULT_EMPTY = new 
Result(POStatus.STATUS_NULL, null);
+    private static final Result RESULT_EOP = new Result(POStatus.STATUS_EOP, 
null);
 
     private boolean bloomCreatedInMap;
     private int vectorSizeBytes;
+    private int numBloomFilters;
     private int numHash;
     private int hashType;
-    private byte bloomKeyType;
-    private boolean isCombiner;
 
     private transient ByteArrayOutputStream baos;
-    private transient Iterator<Object> distinctKeyIter;
+    private transient BloomFilter[] bloomFilters;
+    private transient int nextFilterIdx;
 
-    public BloomPackager(boolean bloomCreatedInMap, int vectorSizeBytes,
+    public BloomPackager(boolean bloomCreatedInMap, int numBloomFilters, int 
vectorSizeBytes,
             int numHash, int hashType) {
         super();
         this.bloomCreatedInMap = bloomCreatedInMap;
         this.vectorSizeBytes = vectorSizeBytes;
         this.numHash = numHash;
         this.hashType = hashType;
-    }
-
-    public void setBloomKeyType(byte keyType) {
-        bloomKeyType = keyType;
-    }
-
-    public void setCombiner(boolean isCombiner) {
-        this.isCombiner = isCombiner;
+        this.numBloomFilters = numBloomFilters;
     }
 
     @Override
@@ -78,21 +72,26 @@ public class BloomPackager extends Packa
     @Override
     public Result getNext() throws ExecException {
         try {
+            if (bags == null) {
+                return new Result(POStatus.STATUS_EOP, null);
+            }
             if (bloomCreatedInMap) {
-                if (bags == null) {
-                    return new Result(POStatus.STATUS_EOP, null);
-                }
-                // Same function for combiner and reducer
                 return combineBloomFilters();
             } else {
-                if (isCombiner) {
-                    return getDistinctBloomKeys();
-                } else {
-                    if (bags == null) {
-                        return new Result(POStatus.STATUS_EOP, null);
-                    }
-                    return createBloomFilter();
+                if (parent.isEndOfAllInput()) {
+                    return retrieveBloomFilter();
                 }
+                if (!bags[0].iterator().hasNext()) {
+                    return new Result(POStatus.STATUS_EOP, null);
+                }
+                if (bloomFilters == null) { // init
+                    bloomFilters = new BloomFilter[numBloomFilters];
+                }
+                // Create the bloom filters from the keys
+                Tuple tup = bags[0].iterator().next();
+                addKeyToBloomFilter(key, (int) tup.get(0));
+                detachInput();
+                return RESULT_EMPTY;
             }
         } catch (IOException e) {
             throw new ExecException("Error while constructing final bloom 
filter", e);
@@ -116,28 +115,6 @@ public class BloomPackager extends Packa
         return getSerializedBloomFilter(partition, bloomFilter, 
bloomBytes.get().length);
     }
 
-    private Result createBloomFilter() throws IOException {
-        // We get a bag of keys. Create a bloom filter from them
-        // First do distinct of the keys. Not using DistinctBag as memory 
should not be a problem.
-        HashSet<Object> bloomKeys = new HashSet<>();
-        Iterator<Tuple> iter = bags[0].iterator();
-        while (iter.hasNext()) {
-            bloomKeys.add(iter.next().get(0));
-        }
-
-        Object partition = key;
-        detachInput(); // Free up the key and bags reference
-
-        BloomFilter bloomFilter = new BloomFilter(vectorSizeBytes * 8, 
numHash, hashType);
-        for (Object bloomKey: bloomKeys) {
-            Key k = new Key(DataType.toBytes(bloomKey, bloomKeyType));
-            bloomFilter.add(k);
-        }
-        bloomKeys = null;
-        return getSerializedBloomFilter(partition, bloomFilter, 
vectorSizeBytes + 64);
-
-    }
-
     private Result getSerializedBloomFilter(Object partition,
             BloomFilter bloomFilter, int serializedSize) throws ExecException,
             IOException {
@@ -159,26 +136,28 @@ public class BloomPackager extends Packa
         return r;
     }
 
-    private Result getDistinctBloomKeys() throws ExecException {
-        if (distinctKeyIter == null) {
-            HashSet<Object> bloomKeys = new HashSet<>();
-            Iterator<Tuple> iter = bags[0].iterator();
-            while (iter.hasNext()) {
-                bloomKeys.add(iter.next().get(0));
+    private void addKeyToBloomFilter(Object key, int partition) throws 
ExecException {
+        Key k = new Key(((DataByteArray)key).get());
+        BloomFilter filter = bloomFilters[partition];
+        if (filter == null) {
+            filter = new BloomFilter(vectorSizeBytes * 8, numHash, hashType);
+            bloomFilters[partition] = filter;
+        }
+        filter.add(k);
+    }
+
+    private Result retrieveBloomFilter() throws IOException  {
+        while (nextFilterIdx < numBloomFilters) {
+            if (bloomFilters[nextFilterIdx] != null) {
+                return getSerializedBloomFilter(nextFilterIdx, 
bloomFilters[nextFilterIdx++], vectorSizeBytes + 64);
+            } else {
+                nextFilterIdx++;
             }
-            distinctKeyIter = bloomKeys.iterator();
         }
-        while (distinctKeyIter.hasNext()) {
-            Tuple res = mTupleFactory.newTuple(2);
-            res.set(0, key);
-            res.set(1, distinctKeyIter.next());
-
-            Result r = new Result();
-            r.result = res;
-            r.returnStatus = POStatus.STATUS_OK;
-            return r;
-        }
-        distinctKeyIter = null;
-        return new Result(POStatus.STATUS_EOP, null);
+        return RESULT_EOP;
+    }
+
+    public boolean isBloomCreatedInMap() {
+        return bloomCreatedInMap;
     }
-}
+}
\ No newline at end of file

Modified: 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POBuildBloomRearrangeTez.java
URL: 
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POBuildBloomRearrangeTez.java?rev=1842768&r1=1842767&r2=1842768&view=diff
==============================================================================
--- 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POBuildBloomRearrangeTez.java
 (original)
+++ 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POBuildBloomRearrangeTez.java
 Wed Oct  3 22:39:09 2018
@@ -37,6 +37,7 @@ import org.apache.pig.classification.Int
 import org.apache.pig.data.DataByteArray;
 import org.apache.pig.data.DataType;
 import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.io.NullableBytesWritable;
 import org.apache.pig.impl.io.NullableIntWritable;
 import org.apache.pig.impl.io.NullableTuple;
 import org.apache.pig.impl.io.PigNullableWritable;
@@ -75,8 +76,7 @@ public class POBuildBloomRearrangeTez ex
     private transient BloomFilter[] bloomFilters;
     private transient KeyValueWriter bloomWriter;
     private transient PigNullableWritable nullKey;
-    private transient Tuple bloomValue;
-    private transient NullableTuple bloomNullableTuple;
+    private transient NullableTuple[] bloomPartitions;
 
     public POBuildBloomRearrangeTez(POLocalRearrangeTez lr,
             boolean createBloomInMap, int numBloomFilters, int vectorSizeBytes,
@@ -142,8 +142,7 @@ public class POBuildBloomRearrangeTez ex
             throw new ExecException(e);
         }
         bloomFilters = new BloomFilter[numBloomFilters];
-        bloomValue = mTupleFactory.newTuple(1);
-        bloomNullableTuple = new NullableTuple(bloomValue);
+        bloomPartitions = new NullableTuple[numBloomFilters];
     }
 
     @Override
@@ -167,7 +166,7 @@ public class POBuildBloomRearrangeTez ex
                             if (createBloomInMap) {
                                 addKeyToBloomFilter(keyObj);
                             } else {
-                                writeJoinKeyForBloom(keyObj);
+                                writeJoinKeyForBloom(keyObj, key);
                             }
                         } else if (skipNullKeys) {
                             // Inner join. So don't bother writing null key
@@ -225,21 +224,27 @@ public class POBuildBloomRearrangeTez ex
         }
     }
 
-    private void writeJoinKeyForBloom(Object key) throws IOException {
-        int partition = (key.hashCode() & Integer.MAX_VALUE) % numBloomFilters;
-        bloomValue.set(0, key);
-        bloomWriter.write(new NullableIntWritable(partition), 
bloomNullableTuple);
+    private void writeJoinKeyForBloom(Object keyObj, PigNullableWritable key) 
throws IOException {
+        int partition = (keyObj.hashCode() & Integer.MAX_VALUE) % 
numBloomFilters;
+        if (bloomPartitions[partition] == null) {
+            Tuple tuple = mTupleFactory.newTuple(1);
+            tuple.set(0, partition);
+            bloomPartitions[partition] = new NullableTuple(tuple);
+        }
+        bloomWriter.write(new NullableBytesWritable(new 
DataByteArray(DataType.toBytes(keyObj, keyType))), bloomPartitions[partition]);
     }
 
     private void writeBloomFilters() throws IOException {
+        Tuple tuple = mTupleFactory.newTuple(1);
+        NullableTuple nTuple = new NullableTuple(tuple);
         ByteArrayOutputStream baos = new ByteArrayOutputStream(vectorSizeBytes 
+ 64);
         for (int i = 0; i < bloomFilters.length; i++) {
             if (bloomFilters[i] != null) {
                 DataOutputStream dos = new DataOutputStream(baos);
                 bloomFilters[i].write(dos);
                 dos.flush();
-                bloomValue.set(0, new DataByteArray(baos.toByteArray()));
-                bloomWriter.write(new NullableIntWritable(i), 
bloomNullableTuple);
+                tuple.set(0, new DataByteArray(baos.toByteArray()));
+                bloomWriter.write(new NullableIntWritable(i), nTuple);
                 baos.reset();
             }
         }

Modified: 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POShuffleTezLoad.java
URL: 
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POShuffleTezLoad.java?rev=1842768&r1=1842767&r2=1842768&view=diff
==============================================================================
--- 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POShuffleTezLoad.java
 (original)
+++ 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POShuffleTezLoad.java
 Wed Oct  3 22:39:09 2018
@@ -188,7 +188,7 @@ public class POShuffleTezLoad extends PO
                 if 
(Boolean.valueOf(conf.get(JobControlCompiler.END_OF_INP_IN_MAP, "false"))) {
                     this.parentPlan.endOfAllInput = true;
                 }
-                return RESULT_EOP;
+                return pkgr.getNext();
             }
 
             key = pkgr.getKey(min);

Modified: pig/trunk/test/e2e/pig/tests/join.conf
URL: 
http://svn.apache.org/viewvc/pig/trunk/test/e2e/pig/tests/join.conf?rev=1842768&r1=1842767&r2=1842768&view=diff
==============================================================================
--- pig/trunk/test/e2e/pig/tests/join.conf (original)
+++ pig/trunk/test/e2e/pig/tests/join.conf Wed Oct  3 22:39:09 2018
@@ -28,7 +28,9 @@ $cfg = {
             {
             # Tuple join key
             'num' => 1,
-            'pig' => q\a = load ':INPATH:/singlefile/studentnulltab10k' as 
(name, age, gpa);
+            'pig' => q\
+SET pig.bloomjoin.num.filters 5;
+a = load ':INPATH:/singlefile/studentnulltab10k' as (name, age, gpa);
 b = load ':INPATH:/singlefile/voternulltab10k' as (name, age, registration, 
contributions);
 --c = filter a by age < 20;
 --d = filter b by age < 20;
@@ -171,7 +173,9 @@ store e into ':OUTPATH:.2';\,
             {
             # Tuple join key
             'num' => 1,
-            'pig' => q\a = load ':INPATH:/singlefile/studentnulltab10k' as 
(name, age, gpa);
+            'pig' => q\
+SET pig.bloomjoin.num.filters 5;
+a = load ':INPATH:/singlefile/studentnulltab10k' as (name, age, gpa);
 b = load ':INPATH:/singlefile/voternulltab10k' as (name, age, registration, 
contributions);
 --c = filter a by age < 20;
 --d = filter b by age < 20;

Modified: 
pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-BloomJoin-1-KeyToReducer.gld
URL: 
http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-BloomJoin-1-KeyToReducer.gld?rev=1842768&r1=1842767&r2=1842768&view=diff
==============================================================================
--- 
pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-BloomJoin-1-KeyToReducer.gld
 (original)
+++ 
pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-BloomJoin-1-KeyToReducer.gld
 Wed Oct  3 22:39:09 2018
@@ -27,15 +27,11 @@ d: BuildBloom Rearrange[tuple]{bytearray
     |---c: Load(file:///tmp/input2:org.apache.pig.builtin.PigStorage) - 
scope-14
 Tez vertex scope-50
 # Combine plan on edge <scope-48>
-Local Rearrange[tuple]{int}(false) - scope-55  ->       scope-50
-|   |
-|   Project[int][0] - scope-54
-|
-|---Package(BloomPackager)[tuple]{int} - scope-53
+org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.DistinctCombiner$Combine
 # Plan on vertex
 POValueOutputTez - scope-52    ->       [scope-46, scope-47]
 |
-|---Package(BloomPackager)[tuple]{int} - scope-51
+|---Package(BloomPackager)[tuple]{bytearray} - scope-51
 Tez vertex scope-46
 # Plan on vertex
 d: BloomFilter Rearrange[tuple]{bytearray}(false) - scope-26   <-       
scope-50       ->       scope-49

Modified: 
pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-BloomJoin-2-KeyToReducer.gld
URL: 
http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-BloomJoin-2-KeyToReducer.gld?rev=1842768&r1=1842767&r2=1842768&view=diff
==============================================================================
--- 
pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-BloomJoin-2-KeyToReducer.gld
 (original)
+++ 
pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-BloomJoin-2-KeyToReducer.gld
 Wed Oct  3 22:39:09 2018
@@ -28,15 +28,11 @@ d: BuildBloom Rearrange[tuple]{chararray
     |---a: Load(file:///tmp/input1:org.apache.pig.builtin.PigStorage) - scope-0
 Tez vertex scope-42
 # Combine plan on edge <scope-39>
-Local Rearrange[tuple]{int}(false) - scope-47  ->       scope-42
-|   |
-|   Project[int][0] - scope-46
-|
-|---Package(BloomPackager)[tuple]{int} - scope-45
+org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.DistinctCombiner$Combine
 # Plan on vertex
 POValueOutputTez - scope-44    ->       [scope-40]
 |
-|---Package(BloomPackager)[tuple]{int} - scope-43
+|---Package(BloomPackager)[tuple]{bytearray} - scope-43
 Tez vertex scope-40
 # Plan on vertex
 d: BloomFilter Rearrange[tuple]{chararray}(false) - scope-22   <-       
scope-42       ->       scope-41

Modified: 
pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-BloomJoin-3-KeyToReducer.gld
URL: 
http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-BloomJoin-3-KeyToReducer.gld?rev=1842768&r1=1842767&r2=1842768&view=diff
==============================================================================
--- 
pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-BloomJoin-3-KeyToReducer.gld
 (original)
+++ 
pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-BloomJoin-3-KeyToReducer.gld
 Wed Oct  3 22:39:09 2018
@@ -4,19 +4,19 @@
 #--------------------------------------------------
 # TEZ DAG plan: pig-0_scope-0
 #--------------------------------------------------
-Tez vertex scope-45    ->      Tez vertex group scope-58,Tez vertex group 
scope-59,
-Tez vertex scope-46    ->      Tez vertex group scope-58,Tez vertex group 
scope-59,
-Tez vertex group scope-59      ->      Tez vertex scope-52,
+Tez vertex scope-45    ->      Tez vertex group scope-55,Tez vertex group 
scope-56,
+Tez vertex scope-46    ->      Tez vertex group scope-55,Tez vertex group 
scope-56,
+Tez vertex group scope-56      ->      Tez vertex scope-52,
 Tez vertex scope-52    ->      Tez vertex scope-44,
 Tez vertex scope-44    ->      Tez vertex scope-51,
-Tez vertex group scope-58      ->      Tez vertex scope-51,
+Tez vertex group scope-55      ->      Tez vertex scope-51,
 Tez vertex scope-51
 
 Tez vertex scope-45
 # Plan on vertex
-d: BuildBloom Rearrange[tuple]{int}(false) - scope-60  ->      [ scope-51, 
scope-52]
+d: BuildBloom Rearrange[tuple]{int}(false) - scope-57  ->      [ scope-51, 
scope-52]
 |   |
-|   Project[int][0] - scope-61
+|   Project[int][0] - scope-58
 |
 |---b: New For Each(false,false)[bag] - scope-15
     |   |
@@ -31,9 +31,9 @@ d: BuildBloom Rearrange[tuple]{int}(fals
     |---b: Load(file:///tmp/input2:org.apache.pig.builtin.PigStorage) - scope-8
 Tez vertex scope-46
 # Plan on vertex
-d: BuildBloom Rearrange[tuple]{int}(false) - scope-62  ->      [ scope-51, 
scope-52]
+d: BuildBloom Rearrange[tuple]{int}(false) - scope-59  ->      [ scope-51, 
scope-52]
 |   |
-|   Project[int][0] - scope-63
+|   Project[int][0] - scope-60
 |
 |---c: New For Each(false,false)[bag] - scope-23
     |   |
@@ -46,25 +46,17 @@ d: BuildBloom Rearrange[tuple]{int}(fals
     |   |---Project[bytearray][1] - scope-20
     |
     |---c: Load(file:///tmp/input3:org.apache.pig.builtin.PigStorage) - 
scope-16
-Tez vertex group scope-59      <-       [scope-45, scope-46]   ->       
scope-52
+Tez vertex group scope-56      <-       [scope-45, scope-46]   ->       
scope-52
 # No plan on vertex group
 Tez vertex scope-52
 # Combine plan on edge <scope-45>
-Local Rearrange[tuple]{int}(false) - scope-57  ->       scope-52
-|   |
-|   Project[int][0] - scope-56
-|
-|---Package(BloomPackager)[tuple]{int} - scope-55
+org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.DistinctCombiner$Combine
 # Combine plan on edge <scope-46>
-Local Rearrange[tuple]{int}(false) - scope-57  ->       scope-52
-|   |
-|   Project[int][0] - scope-56
-|
-|---Package(BloomPackager)[tuple]{int} - scope-55
+org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.DistinctCombiner$Combine
 # Plan on vertex
 POValueOutputTez - scope-54    ->       [scope-44]
 |
-|---Package(BloomPackager)[tuple]{int} - scope-53
+|---Package(BloomPackager)[tuple]{bytearray} - scope-53
 Tez vertex scope-44
 # Plan on vertex
 d: BloomFilter Rearrange[tuple]{int}(false) - scope-29 <-       scope-52       
->       scope-51
@@ -82,7 +74,7 @@ d: BloomFilter Rearrange[tuple]{int}(fal
     |   |---Project[bytearray][1] - scope-4
     |
     |---a: Load(file:///tmp/input1:org.apache.pig.builtin.PigStorage) - scope-0
-Tez vertex group scope-58      <-       [scope-45, scope-46]   ->       
scope-51
+Tez vertex group scope-55      <-       [scope-45, scope-46]   ->       
scope-51
 # No plan on vertex group
 Tez vertex scope-51
 # Plan on vertex

Modified: 
pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-BloomJoin-4-KeyToReducer.gld
URL: 
http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-BloomJoin-4-KeyToReducer.gld?rev=1842768&r1=1842767&r2=1842768&view=diff
==============================================================================
--- 
pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-BloomJoin-4-KeyToReducer.gld
 (original)
+++ 
pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-BloomJoin-4-KeyToReducer.gld
 Wed Oct  3 22:39:09 2018
@@ -60,15 +60,11 @@ d: BuildBloom Rearrange[tuple]{int}(fals
     |---a: Load(file:///tmp/input1:org.apache.pig.builtin.PigStorage) - 
scope-17
 Tez vertex scope-52
 # Combine plan on edge <scope-50>
-Local Rearrange[tuple]{int}(false) - scope-57  ->       scope-52
-|   |
-|   Project[int][0] - scope-56
-|
-|---Package(BloomPackager)[tuple]{int} - scope-55
+org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.DistinctCombiner$Combine
 # Plan on vertex
 POValueOutputTez - scope-54    ->       [scope-46]
 |
-|---Package(BloomPackager)[tuple]{int} - scope-53
+|---Package(BloomPackager)[tuple]{bytearray} - scope-53
 Tez vertex scope-46
 # Plan on vertex
 d: BloomFilter Rearrange[tuple]{int}(false) - scope-29 <-       scope-52       
->       scope-51

Modified: 
pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-BloomJoin-5-KeyToReducer.gld
URL: 
http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-BloomJoin-5-KeyToReducer.gld?rev=1842768&r1=1842767&r2=1842768&view=diff
==============================================================================
--- 
pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-BloomJoin-5-KeyToReducer.gld
 (original)
+++ 
pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-BloomJoin-5-KeyToReducer.gld
 Wed Oct  3 22:39:09 2018
@@ -29,18 +29,14 @@ d: BuildBloom Rearrange[tuple]{int}(fals
     |---b: Load(file:///tmp/input2:org.apache.pig.builtin.PigStorage) - 
scope-21
 Tez vertex scope-62
 # Combine plan on edge <scope-60>
-Local Rearrange[tuple]{int}(false) - scope-67  ->       scope-62
-|   |
-|   Project[int][0] - scope-66
-|
-|---Package(BloomPackager)[tuple]{int} - scope-65
+org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.DistinctCombiner$Combine
 # Plan on vertex
 POValueOutputTez - scope-64    ->       [scope-54, scope-58]
 |
-|---Package(BloomPackager)[tuple]{int} - scope-63
+|---Package(BloomPackager)[tuple]{bytearray} - scope-63
 Tez vertex scope-54
 # Plan on vertex
-a: Split - scope-68
+a: Split - scope-65
 |   |
 |   d: BloomFilter Rearrange[tuple]{int}(false) - scope-34     <-       
scope-62       ->       scope-61
 |   |   |

Modified: 
pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-BloomJoin-6-KeyToReducer.gld
URL: 
http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-BloomJoin-6-KeyToReducer.gld?rev=1842768&r1=1842767&r2=1842768&view=diff
==============================================================================
--- 
pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-BloomJoin-6-KeyToReducer.gld
 (original)
+++ 
pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-BloomJoin-6-KeyToReducer.gld
 Wed Oct  3 22:39:09 2018
@@ -11,7 +11,7 @@ Tez vertex scope-56
 
 Tez vertex scope-49
 # Plan on vertex
-a: Split - scope-63
+a: Split - scope-60
 |   |
 |   a2: Store(file:///tmp/pigoutput/a2:org.apache.pig.builtin.PigStorage) - 
scope-15
 |   |
@@ -48,15 +48,11 @@ a: Split - scope-63
     |---a: Load(file:///tmp/input1:org.apache.pig.builtin.PigStorage) - scope-0
 Tez vertex scope-57
 # Combine plan on edge <scope-49>
-Local Rearrange[tuple]{int}(false) - scope-62  ->       scope-57
-|   |
-|   Project[int][0] - scope-61
-|
-|---Package(BloomPackager)[tuple]{int} - scope-60
+org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.DistinctCombiner$Combine
 # Plan on vertex
 POValueOutputTez - scope-59    ->       [scope-53]
 |
-|---Package(BloomPackager)[tuple]{int} - scope-58
+|---Package(BloomPackager)[tuple]{bytearray} - scope-58
 Tez vertex scope-53
 # Plan on vertex
 d: BloomFilter Rearrange[tuple]{int}(false) - scope-34 <-       scope-57       
->       scope-56

Modified: 
pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-BloomJoin-7-KeyToReducer.gld
URL: 
http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-BloomJoin-7-KeyToReducer.gld?rev=1842768&r1=1842767&r2=1842768&view=diff
==============================================================================
--- 
pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-BloomJoin-7-KeyToReducer.gld
 (original)
+++ 
pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-BloomJoin-7-KeyToReducer.gld
 Wed Oct  3 22:39:09 2018
@@ -12,7 +12,7 @@ Tez vertex scope-51
 
 Tez vertex scope-43
 # Plan on vertex
-a: Split - scope-58
+a: Split - scope-55
 |   |
 |   e: BuildBloom Rearrange[tuple]{int}(false) - scope-36      ->      [ 
scope-51, scope-52]
 |   |   |
@@ -41,15 +41,11 @@ a: Split - scope-58
     |---a: Load(file:///tmp/input1:org.apache.pig.builtin.PigStorage) - scope-0
 Tez vertex scope-52
 # Combine plan on edge <scope-43>
-Local Rearrange[tuple]{int}(false) - scope-57  ->       scope-52
-|   |
-|   Project[int][0] - scope-56
-|
-|---Package(BloomPackager)[tuple]{int} - scope-55
+org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.DistinctCombiner$Combine
 # Plan on vertex
 POValueOutputTez - scope-54    ->       [scope-45, scope-47]
 |
-|---Package(BloomPackager)[tuple]{int} - scope-53
+|---Package(BloomPackager)[tuple]{bytearray} - scope-53
 Tez vertex scope-45
 # Plan on vertex
 e: BloomFilter Rearrange[tuple]{int}(false) - scope-32 <-       scope-52       
->       scope-51


Reply via email to