Author: rohini
Date: Fri Oct 12 16:57:50 2018
New Revision: 1843689

URL: http://svn.apache.org/viewvc?rev=1843689&view=rev
Log:
PIG-5255: Improvements to bloom join (satishsaley via rohini)

Added:
    pig/trunk/shade/
    pig/trunk/shade/roaringbitmap/
    pig/trunk/shade/roaringbitmap/pom.xml
    pig/trunk/src/org/apache/pig/impl/bloom/
    pig/trunk/src/org/apache/pig/impl/bloom/BloomFilter.java
    pig/trunk/src/org/apache/pig/impl/bloom/Hash.java
    pig/trunk/src/org/apache/pig/impl/bloom/HashFunction.java
    pig/trunk/src/org/apache/pig/impl/bloom/HashProvider.java
    pig/trunk/src/org/apache/pig/impl/bloom/JenkinsHash.java
    pig/trunk/src/org/apache/pig/impl/bloom/KirschMitzenmacherHash.java
    pig/trunk/src/org/apache/pig/impl/bloom/Murmur3Hash.java
Modified:
    pig/trunk/CHANGES.txt
    pig/trunk/build.xml
    pig/trunk/ivy.xml
    pig/trunk/ivy/libraries.properties
    
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/BloomPackager.java
    
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POBloomFilterRearrangeTez.java
    
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POBuildBloomRearrangeTez.java
    pig/trunk/src/org/apache/pig/impl/util/JarManager.java
    pig/trunk/test/org/apache/pig/test/TestJobControlCompiler.java

Modified: pig/trunk/CHANGES.txt
URL: 
http://svn.apache.org/viewvc/pig/trunk/CHANGES.txt?rev=1843689&r1=1843688&r2=1843689&view=diff
==============================================================================
--- pig/trunk/CHANGES.txt (original)
+++ pig/trunk/CHANGES.txt Fri Oct 12 16:57:50 2018
@@ -26,6 +26,8 @@ PIG-5282: Upgade to Java 8 (satishsaley
  
 IMPROVEMENTS
 
+PIG-5255: Improvements to bloom join (satishsaley via rohini)
+
 PIG-5359: Reduce time spent in split serialization (satishsaley via rohini)
 
 PIG-5357: BagFactory interface should support creating a distinct bag from a 
set (jtolar via rohini)

Modified: pig/trunk/build.xml
URL: 
http://svn.apache.org/viewvc/pig/trunk/build.xml?rev=1843689&r1=1843688&r2=1843689&view=diff
==============================================================================
--- pig/trunk/build.xml (original)
+++ pig/trunk/build.xml Fri Oct 12 16:57:50 2018
@@ -399,6 +399,7 @@
             <include name="joda-time-${joda-time.version}.jar"/>
             <include name="automaton-${automaton.version}.jar"/>
             <include name="jansi-${jansi.version}.jar"/>
+            <include 
name="RoaringBitmap-shaded-${roaring-bitmap-shaded.version}.jar"/>
         </patternset>
     </fileset>
 
@@ -741,6 +742,7 @@
             <fileset dir="${ivy.lib.dir}" 
includes="parser-core-${basjes-httpdlog-pigloader.version}.jar"/>
             <fileset dir="${ivy.lib.dir}" includes="ivy-*.jar"/>
             <fileset dir="${ivy.lib.dir}" includes="commons-logging-*.jar"/>
+            <fileset dir="${ivy.lib.dir}" 
includes="RoaringBitmap-shaded-${roaring-bitmap-shaded.version}.jar"/>
         </copy>
     </target>
 
@@ -1716,6 +1718,10 @@
        <ivy:retrieve settingsRef="${ant.project.name}.ivy.settings" 
log="${loglevel}"
                  
pattern="${ivy.lib.dir.spark}/[artifact]-[revision](-[classifier]).[ext]" 
conf="spark${sparkversion},hbase${hbaseversion}"/>
        <ivy:cachepath pathid="compile.classpath" conf="compile"/>
+       <exec dir="${basedir}/shade/roaringbitmap" executable="mvn">
+          <arg line="clean package 
-Droaring.bitmap.version=${roaring-bitmap-shaded.version}"/>
+        </exec>
+       <copy 
file="${basedir}/shade/roaringbitmap/target/RoaringBitmap-shaded-${roaring-bitmap-shaded.version}.jar"
 todir="${ivy.lib.dir}"/>
      </target>
 
      <target name="ivy-test" depends="ivy-resolve" description="Retrieve 
Ivy-managed artifacts for test configuration">

Modified: pig/trunk/ivy.xml
URL: 
http://svn.apache.org/viewvc/pig/trunk/ivy.xml?rev=1843689&r1=1843688&r2=1843689&view=diff
==============================================================================
--- pig/trunk/ivy.xml (original)
+++ pig/trunk/ivy.xml Fri Oct 12 16:57:50 2018
@@ -235,6 +235,9 @@
     <dependency org="org.jruby" name="jruby-complete" rev="${jruby.version}" 
conf="compile->master"/>
     <dependency org="asm" name="asm" rev="${asm.version}" 
conf="compile->default"/>
 
+    <!-- Dependencies for bloom join -->
+    <dependency org="org.roaringbitmap" name="RoaringBitmap" 
rev="${roaring-bitmap-shaded.version}" conf="compile->master"/>
+
     <!-- HBase dependency in format for releases higher or equal to 0.95 -->
     <dependency org="org.apache.hbase" name="hbase-client" 
rev="${hbase1.version}" conf="hbase1->master">
       <artifact name="hbase-client" type="jar"/>

Modified: pig/trunk/ivy/libraries.properties
URL: 
http://svn.apache.org/viewvc/pig/trunk/ivy/libraries.properties?rev=1843689&r1=1843688&r2=1843689&view=diff
==============================================================================
--- pig/trunk/ivy/libraries.properties (original)
+++ pig/trunk/ivy/libraries.properties Fri Oct 12 16:57:50 2018
@@ -96,4 +96,5 @@ htrace.version=3.1.0-incubating
 htrace4.version=4.0.1-incubating
 commons-lang3.version=3.6
 scala-xml.version=1.0.5
-glassfish.el.version=3.0.1-b08
\ No newline at end of file
+glassfish.el.version=3.0.1-b08
+roaring-bitmap-shaded.version=0.7.14
\ No newline at end of file

Added: pig/trunk/shade/roaringbitmap/pom.xml
URL: 
http://svn.apache.org/viewvc/pig/trunk/shade/roaringbitmap/pom.xml?rev=1843689&view=auto
==============================================================================
--- pig/trunk/shade/roaringbitmap/pom.xml (added)
+++ pig/trunk/shade/roaringbitmap/pom.xml Fri Oct 12 16:57:50 2018
@@ -0,0 +1,80 @@
+<!--
+   Licensed to the Apache Software Foundation (ASF) under one or more
+   contributor license agreements.  See the NOTICE file distributed with
+   this work for additional information regarding copyright ownership.
+   The ASF licenses this file to You under the Apache License, Version 2.0
+   (the "License"); you may not use this file except in compliance with
+   the License.  You may obtain a copy of the License at
+       http://www.apache.org/licenses/LICENSE-2.0
+   Unless required by applicable law or agreed to in writing, software
+   distributed under the License is distributed on an "AS IS" BASIS,
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+   See the License for the specific language governing permissions and
+   limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/maven-v4_0_0.xsd";>
+
+    <modelVersion>4.0.0</modelVersion>
+
+    <parent>
+        <groupId>org.apache</groupId>
+        <artifactId>apache</artifactId>
+        <version>14</version>
+    </parent>
+
+    <groupId>org.apache.pig</groupId>
+    <artifactId>RoaringBitmap-shaded</artifactId>
+    <packaging>jar</packaging>
+    <version>${roaring.bitmap.version}</version>
+
+    <name>Pig RoaringBitmap</name>
+    <url>http://pig.apache.org</url>
+    <prerequisites>
+        <maven>3.0</maven>
+    </prerequisites>
+
+    <modules>
+    </modules>
+
+    <properties>
+        <maven.shade.plugin.version>2.4.3</maven.shade.plugin.version>
+        <roaring.bitmap.version>0.7.14</roaring.bitmap.version>
+    </properties>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.roaringbitmap</groupId>
+            <artifactId>RoaringBitmap</artifactId>
+            <version>${roaring.bitmap.version}</version>
+        </dependency>
+    </dependencies>
+
+    <build>
+        <plugins>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-shade-plugin</artifactId>
+                <version>${maven.shade.plugin.version}</version>
+                <executions>
+                    <execution>
+                        <id>shade-asm</id>
+                        <phase>package</phase>
+                        <goals>
+                            <goal>shade</goal>
+                        </goals>
+                        <configuration>
+                            <relocations>
+                                <relocation>
+                                    <pattern>org.roaringbitmap</pattern>
+                                    
<shadedPattern>org.apache.pig.org.roaringbitmap</shadedPattern>
+                                </relocation>
+                            </relocations>
+                        </configuration>
+                    </execution>
+                </executions>
+            </plugin>
+        </plugins>
+    </build>
+
+</project>

Modified: 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/BloomPackager.java
URL: 
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/BloomPackager.java?rev=1843689&r1=1843688&r2=1843689&view=diff
==============================================================================
--- 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/BloomPackager.java
 (original)
+++ 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/BloomPackager.java
 Fri Oct 12 16:57:50 2018
@@ -22,17 +22,16 @@ import java.io.DataOutputStream;
 import java.io.IOException;
 import java.util.Iterator;
 
-import org.apache.hadoop.util.bloom.BloomFilter;
 import org.apache.hadoop.util.bloom.Key;
 import org.apache.pig.backend.executionengine.ExecException;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result;
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.Packager;
-import org.apache.pig.builtin.BuildBloomBase;
 import org.apache.pig.data.DataBag;
 import org.apache.pig.data.DataByteArray;
 import org.apache.pig.data.DataType;
 import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.bloom.BloomFilter;
 
 public class BloomPackager extends Packager {
 
@@ -103,10 +102,10 @@ public class BloomPackager extends Packa
         Iterator<Tuple> iter = bags[0].iterator();
         Tuple tup = iter.next();
         DataByteArray bloomBytes = (DataByteArray) tup.get(0);
-        BloomFilter bloomFilter = BuildBloomBase.bloomIn(bloomBytes);
+        BloomFilter bloomFilter = BloomFilter.bloomIn(bloomBytes);
         while (iter.hasNext()) {
             tup = iter.next();
-            bloomFilter.or(BuildBloomBase.bloomIn((DataByteArray) tup.get(0)));
+            bloomFilter.or(BloomFilter.bloomIn((DataByteArray) tup.get(0)));
         }
 
         Object partition = key;
@@ -160,4 +159,4 @@ public class BloomPackager extends Packa
     public boolean isBloomCreatedInMap() {
         return bloomCreatedInMap;
     }
-}
\ No newline at end of file
+}

Modified: 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POBloomFilterRearrangeTez.java
URL: 
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POBloomFilterRearrangeTez.java?rev=1843689&r1=1843688&r2=1843689&view=diff
==============================================================================
--- 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POBloomFilterRearrangeTez.java
 (original)
+++ 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POBloomFilterRearrangeTez.java
 Fri Oct 12 16:57:50 2018
@@ -24,7 +24,6 @@ import java.util.Set;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.util.bloom.BloomFilter;
 import org.apache.hadoop.util.bloom.Key;
 import org.apache.pig.backend.executionengine.ExecException;
 import org.apache.pig.backend.hadoop.HDataType;
@@ -32,12 +31,12 @@ import org.apache.pig.backend.hadoop.exe
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result;
 import org.apache.pig.backend.hadoop.executionengine.tez.runtime.ObjectCache;
 import org.apache.pig.backend.hadoop.executionengine.tez.runtime.TezInput;
-import org.apache.pig.builtin.BuildBloomBase;
 import org.apache.pig.classification.InterfaceAudience;
 import org.apache.pig.classification.InterfaceStability;
 import org.apache.pig.data.DataByteArray;
 import org.apache.pig.data.DataType;
 import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.bloom.BloomFilter;
 import org.apache.pig.impl.io.NullableTuple;
 import org.apache.pig.impl.io.PigNullableWritable;
 import org.apache.tez.runtime.api.LogicalInput;
@@ -106,7 +105,7 @@ public class POBloomFilterRearrangeTez e
                 }
                 Tuple val = (Tuple) reader.getCurrentValue();
                 int index = (int) val.get(0);
-                bloomFilters[index] = BuildBloomBase.bloomIn((DataByteArray) 
val.get(1));
+                bloomFilters[index] = BloomFilter.bloomIn((DataByteArray) 
val.get(1));
             }
             ObjectCache.getInstance().cache(cacheKey, bloomFilters);
         } catch (Exception e) {

Modified: 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POBuildBloomRearrangeTez.java
URL: 
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POBuildBloomRearrangeTez.java?rev=1843689&r1=1843688&r2=1843689&view=diff
==============================================================================
--- 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POBuildBloomRearrangeTez.java
 (original)
+++ 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POBuildBloomRearrangeTez.java
 Fri Oct 12 16:57:50 2018
@@ -25,7 +25,6 @@ import java.util.Map;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.util.bloom.BloomFilter;
 import org.apache.hadoop.util.bloom.Key;
 import org.apache.pig.PigConfiguration;
 import org.apache.pig.backend.executionengine.ExecException;
@@ -37,6 +36,7 @@ import org.apache.pig.classification.Int
 import org.apache.pig.data.DataByteArray;
 import org.apache.pig.data.DataType;
 import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.bloom.BloomFilter;
 import org.apache.pig.impl.io.NullableBytesWritable;
 import org.apache.pig.impl.io.NullableIntWritable;
 import org.apache.pig.impl.io.NullableTuple;

Added: pig/trunk/src/org/apache/pig/impl/bloom/BloomFilter.java
URL: 
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/impl/bloom/BloomFilter.java?rev=1843689&view=auto
==============================================================================
--- pig/trunk/src/org/apache/pig/impl/bloom/BloomFilter.java (added)
+++ pig/trunk/src/org/apache/pig/impl/bloom/BloomFilter.java Fri Oct 12 
16:57:50 2018
@@ -0,0 +1,198 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pig.impl.bloom;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInput;
+import java.io.DataInputStream;
+import java.io.DataOutput;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.Arrays;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.compress.BZip2Codec;
+import org.apache.hadoop.io.compress.CompressionInputStream;
+import org.apache.hadoop.io.compress.CompressionOutputStream;
+import org.apache.hadoop.util.bloom.Filter;
+import org.apache.hadoop.util.bloom.Key;
+import org.apache.pig.data.DataByteArray;
+import org.apache.pig.org.roaringbitmap.RoaringBitmap;
+
+public class BloomFilter extends Filter {
+    private static final Log LOG = LogFactory.getLog(BloomFilter.class);
+    private static final int VERSION = 1;
+    private HashFunction hash;
+    private RoaringBitmap bitmap;
+    private int hashAlgorithm;
+
+    /**
+     * Read the fields using bloomIn
+     */
+    public BloomFilter() {
+        super();
+    }
+
+    public BloomFilter(int vectorSize, int nbHash, int hashAlgorithm) {
+        super.vectorSize = vectorSize;
+        super.nbHash = nbHash;
+        this.hashAlgorithm = hashAlgorithm;
+        this.hash = new HashFunction(vectorSize, nbHash, hashAlgorithm);
+        this.bitmap = new RoaringBitmap();
+    }
+
+    @Override
+    public void add(Key key) {
+        if(key == null) {
+            throw new NullPointerException("key cannot be null");
+        }
+        int[] h = hash.hash(key);
+        hash.clear();
+        Arrays.sort(h);
+        this.bitmap.or(RoaringBitmap.bitmapOf(h));
+    }
+
+    @Override
+    public boolean membershipTest(Key key) {
+        if(key == null) {
+            throw new NullPointerException("key cannot be null");
+        }
+        int[] h = hash.hash(key);
+        hash.clear();
+        for(int i = 0; i < nbHash; i++) {
+            if(!this.bitmap.contains(h[i])) {
+                return false;
+            }
+        }
+        return true;
+    }
+
+    @Override
+    public void and(Filter filter) {
+        if(filter == null
+                || !(filter instanceof BloomFilter)
+                || ((BloomFilter)filter).vectorSize != this.vectorSize
+                || ((BloomFilter)filter).nbHash != this.nbHash) {
+              throw new IllegalArgumentException("filters cannot be and-ed");
+        }
+        this.bitmap.and(((BloomFilter)filter).bitmap);
+    }
+
+    @Override
+    public void or(Filter filter) {
+        if(filter == null
+                || !(filter instanceof BloomFilter)
+                || ((BloomFilter)filter).vectorSize != this.vectorSize
+                || ((BloomFilter)filter).nbHash != this.nbHash) {
+              throw new IllegalArgumentException("filters cannot be or-ed");
+        }
+        this.bitmap.or(((BloomFilter)filter).bitmap);
+    }
+
+    @Override
+    public void xor(Filter filter) {
+        if(filter == null
+                || !(filter instanceof BloomFilter)
+                || ((BloomFilter)filter).vectorSize != this.vectorSize
+                || ((BloomFilter)filter).nbHash != this.nbHash) {
+              throw new IllegalArgumentException("filters cannot be xor-ed");
+        }
+        this.bitmap.xor(((BloomFilter)filter).bitmap);
+    }
+
+    @Override
+    public void not() {
+        this.bitmap.flip(0, vectorSize);
+    }
+
+    @Override
+    public String toString() {
+      return this.bitmap.toString();
+    }
+
+    /**
+     * @return size of the the bloomfilter
+     */
+    public int getVectorSize() {
+      return this.vectorSize;
+    }
+
+    @Override
+    public void write(DataOutput out) throws IOException {
+        out.writeInt(VERSION);
+        out.writeInt(this.nbHash);
+        out.writeByte(this.hashAlgorithm);
+        out.writeInt(this.vectorSize);
+        this.bitmap.runOptimize();
+        ByteArrayOutputStream bos = compressBitmap();
+        LOG.info("Compressed bitmap from " + String.format("%,8d bytes", 
this.bitmap.getSizeInBytes())
+        + " to "+ String.format("%,8d bytes", bos.size()));
+        out.writeInt(bos.size());
+        out.write(bos.toByteArray());
+    }
+
+    @Override
+    public void readFields(DataInput in) throws IOException {
+        int ver = in.readInt();
+        if (ver == VERSION) {
+            this.nbHash = in.readInt();
+            this.hashAlgorithm = in.readByte();
+        } else {
+            throw new IOException("Unsupported version: " + ver);
+        }
+        this.vectorSize = in.readInt();
+        this.hash = new HashFunction(this.vectorSize, this.nbHash, 
this.hashAlgorithm);
+        this.bitmap = new RoaringBitmap();
+        int compressedSize = in.readInt();
+        byte[] buf = new byte[compressedSize];
+        in.readFully(buf);
+        this.bitmap.deserialize(decompressBitmap(buf));
+    }
+
+    public static BloomFilter bloomIn(DataByteArray b) throws IOException {
+        DataInputStream dis = new DataInputStream(new
+            ByteArrayInputStream(b.get()));
+        BloomFilter f = new BloomFilter();
+        f.readFields(dis);
+        return f;
+    }
+    private ByteArrayOutputStream compressBitmap() throws IOException {
+        ByteArrayOutputStream bos = new ByteArrayOutputStream();
+        BZip2Codec bzip = new BZip2Codec();
+        bzip.setConf(new Configuration(false));
+        CompressionOutputStream compressionOut = bzip.createOutputStream(bos);
+        DataOutputStream dos = new DataOutputStream(compressionOut);
+        this.bitmap.serialize(dos);
+        compressionOut.finish();
+        dos.flush();
+        return bos;
+    }
+
+    private DataInput decompressBitmap(byte[] buffer) throws IOException {
+        ByteArrayInputStream deCompressedDataBuffer = new 
ByteArrayInputStream(buffer, 0, buffer.length);
+        BZip2Codec bzip = new BZip2Codec();
+        bzip.setConf(new Configuration(false));
+        CompressionInputStream compressionIn = 
bzip.createInputStream(deCompressedDataBuffer);
+        DataInputStream inflateIn = new DataInputStream(compressionIn);
+        return inflateIn;
+    }
+}

Added: pig/trunk/src/org/apache/pig/impl/bloom/Hash.java
URL: 
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/impl/bloom/Hash.java?rev=1843689&view=auto
==============================================================================
--- pig/trunk/src/org/apache/pig/impl/bloom/Hash.java (added)
+++ pig/trunk/src/org/apache/pig/impl/bloom/Hash.java Fri Oct 12 16:57:50 2018
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pig.impl.bloom;
+
+public abstract class Hash {
+    public static final int MURMUR = 1;
+    public static final int MURMUR3 = MURMUR;
+    public static final int MURMUR3KM = 2;
+    public static final int JENKINS = 3;
+
+    public static int parseHashType(String hashType) {
+        if("murmur".equalsIgnoreCase(hashType)) {
+            return MURMUR3;
+        } else if("murmur3km".equalsIgnoreCase(hashType)) {
+            return MURMUR3KM;
+        } else if("jenkins".equalsIgnoreCase(hashType)) {
+            return JENKINS;
+        }
+        throw new IllegalArgumentException("Hash Algorithm values must be one 
of - murmur, murmur3km, jenkins");
+    }
+
+    public static Hash getInstance(int hashType) {
+        switch (hashType) {
+            case MURMUR:
+                return new Murmur3Hash();
+            case MURMUR3KM:
+                return new KirschMitzenmacherHash();
+            case JENKINS:
+                return new JenkinsHash();
+        }
+        throw new IllegalArgumentException("Hash type values must be one of - 
1 (murmur), 2 (murmur3km), 3 (jenkins)");
+    }
+
+    /**
+     * @param bytes
+     * @param maxValue The maximum hashed value
+     * @param nbHash The number of hashed values
+     * @return
+     */
+    public abstract int[] hash(byte[] bytes, int maxValue, int nbHash);
+
+}

Added: pig/trunk/src/org/apache/pig/impl/bloom/HashFunction.java
URL: 
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/impl/bloom/HashFunction.java?rev=1843689&view=auto
==============================================================================
--- pig/trunk/src/org/apache/pig/impl/bloom/HashFunction.java (added)
+++ pig/trunk/src/org/apache/pig/impl/bloom/HashFunction.java Fri Oct 12 
16:57:50 2018
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pig.impl.bloom;
+
+import org.apache.hadoop.util.bloom.Key;
+
+public class HashFunction {
+      /** The number of hashed values. */
+      protected int nbHash;
+      /** The maximum highest returned value. */
+      protected int maxValue;
+      /** Hashing algorithm to use. */
+      protected Hash hashAlgorithm;
+      /**
+       * Constructor.
+       * <p>
+       * Builds a hash function that must obey to a given maximum number of 
returned values and a highest value.
+       * @param maxValue The maximum highest returned value.
+       * @param nbHash The number of resulting hashed values.
+       * @param hashAlgorithm type of the hashing algorithm (see {@link Hash}).
+       */
+      public HashFunction(int maxValue, int nbHash, int hashAlgorithm) {
+          if (maxValue <= 0) {
+              throw new IllegalArgumentException("maxValue must be > 0");
+          }
+          if (nbHash <= 0) {
+              throw new IllegalArgumentException("nbHash must be > 0");
+          }
+
+          this.maxValue = maxValue;
+          this.nbHash = nbHash;
+          this.hashAlgorithm = Hash.getInstance(hashAlgorithm);
+      }
+
+      /** Clears <i>this</i> hash function. A NOOP */
+      public void clear() {
+      }
+
+      /**
+       * Hashes a specified key into several integers.
+       * @param k The specified key.
+       * @return The array of hashed values.
+       */
+      public int[] hash(Key k){
+          byte[] b = k.getBytes();
+          if (b == null) {
+              throw new NullPointerException("buffer reference is null");
+          }
+          if (b.length == 0) {
+              throw new IllegalArgumentException("key length must be > 0");
+          }
+          return hashAlgorithm.hash(b, maxValue, nbHash);
+       }
+}
\ No newline at end of file

Added: pig/trunk/src/org/apache/pig/impl/bloom/HashProvider.java
URL: 
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/impl/bloom/HashProvider.java?rev=1843689&view=auto
==============================================================================
--- pig/trunk/src/org/apache/pig/impl/bloom/HashProvider.java (added)
+++ pig/trunk/src/org/apache/pig/impl/bloom/HashProvider.java Fri Oct 12 
16:57:50 2018
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+/*
+ * Copyright Felix Gessert and Florian Bücklers. All rights reserved. 
Permission is hereby granted,
+ * free of charge, to any person obtaining a copy of this software and 
associated documentation files
+ * (the "Software"), to deal in the Software without restriction, including 
without limitation the
+ * rights to use, copy, modify, merge, publish, distribute, sublicense, and/or 
sell copies of the
+ * Software, and to permit persons to whom the Software is furnished to do so, 
subject to the following
+ * conditions:
+ * The above copyright notice and this permission notice shall be included in 
all copies or substantial
+ * portions of the Software.
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR 
IMPLIED, INCLUDING BUT
+ * NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR 
PURPOSE AND NONINFRINGEMENT.
+ * IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, 
DAMAGES OR OTHER LIABILITY,
+ * WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF 
OR IN CONNECTION WITH THE
+ * SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
+ */
+package org.apache.pig.impl.bloom;
+
+import java.util.function.BiFunction;
+
+/**
+ * Taken from https://github.com/Baqend/Orestes-Bloomfilter
+ */
+public class HashProvider {
+
+    public static int[] hashMurmur3(byte[] value, int m, int k) {
+        return rejectionSample(HashProvider::murmur3_signed, value, m, k);
+    }
+
+    public static int[] hashKirschMitzenmacher(byte[] value, int m, int k) {
+        int[] result = new int[k];
+        long hash1 = murmur3(0, value);
+        long hash2 = murmur3((int) hash1, value);
+        for (int i = 0; i < k; i++) {
+            result[i] = (int) ((hash1 + i * hash2) % m);
+        }
+        return result;
+    }
+
+    private static long murmur3(int seed, byte[] bytes) {
+        return Integer.toUnsignedLong(murmur3_signed(seed, bytes));
+    }
+
+    private static int murmur3_signed(int seed, byte[] bytes) {
+        int h1 = seed;
+        //Standard in Guava
+        int c1 = 0xcc9e2d51;
+        int c2 = 0x1b873593;
+        int len = bytes.length;
+        int i = 0;
+
+        while (len >= 4) {
+            //process()
+            int k1  = (bytes[i++] & 0xFF);
+                k1 |= (bytes[i++] & 0xFF) << 8;
+                k1 |= (bytes[i++] & 0xFF) << 16;
+                k1 |= (bytes[i++] & 0xFF) << 24;
+
+            k1 *= c1;
+            k1 = Integer.rotateLeft(k1, 15);
+            k1 *= c2;
+
+            h1 ^= k1;
+            h1 = Integer.rotateLeft(h1, 13);
+            h1 = h1 * 5 + 0xe6546b64;
+
+            len -= 4;
+        }
+
+        //processingRemaining()
+        int k1 = 0;
+        switch (len) {
+            case 3:
+                k1 ^= (bytes[i + 2] & 0xFF) << 16;
+                // fall through
+            case 2:
+                k1 ^= (bytes[i + 1] & 0xFF) << 8;
+                // fall through
+            case 1:
+                k1 ^= (bytes[i] & 0xFF);
+
+                k1 *= c1;
+                k1 = Integer.rotateLeft(k1, 15);
+                k1 *= c2;
+                h1 ^= k1;
+        }
+        i += len;
+
+        //makeHash()
+        h1 ^= i;
+
+        h1 ^= h1 >>> 16;
+        h1 *= 0x85ebca6b;
+        h1 ^= h1 >>> 13;
+        h1 *= 0xc2b2ae35;
+        h1 ^= h1 >>> 16;
+
+        return h1;
+    }
+
+    /**
+     * Performs rejection sampling on a random 32bit Java int (sampled from 
Integer.MIN_VALUE to Integer.MAX_VALUE).
+     *
+     * @param random int
+     * @param m     integer output range [1,size]
+     * @return the number down-sampled to interval [0, size]. Or -1 if it has 
to be rejected.
+     */
+    private static int rejectionSample(int random, int m) {
+        random = Math.abs(random);
+        if (random > (2147483647 - 2147483647 % m)
+                || random == Integer.MIN_VALUE)
+            return -1;
+        else
+            return random % m;
+    }
+
+    private static int[] rejectionSample(BiFunction<Integer, byte[], Integer> 
hashFunction, byte[] value, int m, int k) {
+        int[] hashes = new int[k];
+        int seed = 0;
+        int pos = 0;
+        while (pos < k) {
+            seed = hashFunction.apply(seed, value);
+            int hash = rejectionSample(seed, m);
+            if (hash != -1) {
+                hashes[pos++] = hash;
+            }
+        }
+        return hashes;
+    }
+
+}

Added: pig/trunk/src/org/apache/pig/impl/bloom/JenkinsHash.java
URL: 
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/impl/bloom/JenkinsHash.java?rev=1843689&view=auto
==============================================================================
--- pig/trunk/src/org/apache/pig/impl/bloom/JenkinsHash.java (added)
+++ pig/trunk/src/org/apache/pig/impl/bloom/JenkinsHash.java Fri Oct 12 
16:57:50 2018
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pig.impl.bloom;
+
+public class JenkinsHash extends Hash {
+
+    private org.apache.hadoop.util.hash.Hash hashFunction = 
org.apache.hadoop.util.hash.JenkinsHash.getInstance();
+
+    @Override
+    public int[] hash(byte[] bytes, int maxValue, int nbHash) {
+        int[] result = new int[nbHash];
+        for (int i = 0, initval = 0; i < nbHash; i++) {
+        initval = hashFunction.hash(bytes, initval);
+        result[i] = Math.abs(initval % maxValue);
+        }
+        return result;
+    }
+
+}

Added: pig/trunk/src/org/apache/pig/impl/bloom/KirschMitzenmacherHash.java
URL: 
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/impl/bloom/KirschMitzenmacherHash.java?rev=1843689&view=auto
==============================================================================
--- pig/trunk/src/org/apache/pig/impl/bloom/KirschMitzenmacherHash.java (added)
+++ pig/trunk/src/org/apache/pig/impl/bloom/KirschMitzenmacherHash.java Fri Oct 
12 16:57:50 2018
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pig.impl.bloom;
+
+public class KirschMitzenmacherHash extends Hash {
+
+    @Override
+    public int[] hash(byte[] bytes, int maxValue, int numHash) {
+        return HashProvider.hashKirschMitzenmacher(bytes, maxValue , numHash);
+    }
+
+}

Added: pig/trunk/src/org/apache/pig/impl/bloom/Murmur3Hash.java
URL: 
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/impl/bloom/Murmur3Hash.java?rev=1843689&view=auto
==============================================================================
--- pig/trunk/src/org/apache/pig/impl/bloom/Murmur3Hash.java (added)
+++ pig/trunk/src/org/apache/pig/impl/bloom/Murmur3Hash.java Fri Oct 12 
16:57:50 2018
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pig.impl.bloom;
+
+public class Murmur3Hash extends Hash {
+    @Override
+    public int[] hash(byte[] bytes, int maxValue, int numHash) {
+        return HashProvider.hashMurmur3(bytes, maxValue, numHash);
+    }
+}

Modified: pig/trunk/src/org/apache/pig/impl/util/JarManager.java
URL: 
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/impl/util/JarManager.java?rev=1843689&r1=1843688&r2=1843689&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/impl/util/JarManager.java (original)
+++ pig/trunk/src/org/apache/pig/impl/util/JarManager.java Fri Oct 12 16:57:50 
2018
@@ -48,6 +48,7 @@ import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.util.StringUtils;
 import 
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapReduce;
 import org.apache.pig.impl.PigContext;
+import org.apache.pig.org.roaringbitmap.RoaringBitmap;
 import org.apache.tools.bzip2r.BZip2Constants;
 import org.joda.time.DateTime;
 
@@ -66,7 +67,8 @@ public class JarManager {
         BZIP2R(BZip2Constants.class),
         AUTOMATON(Automaton.class),
         ANTLR(CommonTokenStream.class),
-        JODATIME(DateTime.class);
+        JODATIME(DateTime.class),
+        SHADED_ROARING_BITMAP(RoaringBitmap.class);
 
         private final Class pkgClass;
 

Modified: pig/trunk/test/org/apache/pig/test/TestJobControlCompiler.java
URL: 
http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestJobControlCompiler.java?rev=1843689&r1=1843688&r2=1843689&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/test/TestJobControlCompiler.java (original)
+++ pig/trunk/test/org/apache/pig/test/TestJobControlCompiler.java Fri Oct 12 
16:57:50 2018
@@ -130,7 +130,7 @@ public class TestJobControlCompiler {
     // verifying the jar gets on distributed cache
     Path[] fileClassPaths = DistributedCache.getFileClassPaths(jobConf);
     // guava jar is not shipped with Hadoop 2.x
-    Assert.assertEquals("size for "+Arrays.toString(fileClassPaths), 5, 
fileClassPaths.length);
+    Assert.assertEquals("size for "+Arrays.toString(fileClassPaths), 6, 
fileClassPaths.length);
     Path distributedCachePath = fileClassPaths[0];
     Assert.assertEquals("ends with jar name: "+distributedCachePath, 
distributedCachePath.getName(), tmpFile.getName());
     // hadoop bug requires path to not contain hdfs://hotname in front
@@ -234,11 +234,11 @@ public class TestJobControlCompiler {
           // 4. another.jar and 5. udf1.jar, and not duplicate udf.jar
           System.out.println("cache.files= " + Arrays.toString(cacheURIs));
           System.out.println("classpath.files= " + 
Arrays.toString(fileClassPaths));
-          // Default jars - 5 (pig, antlr, joda-time, automaton)
+          // Default jars - 5 (pig, antlr, joda-time, automaton, 
roaring-bitmap-shaded)
           // Other jars - 10 (udf.jar#udf.jar, udf1.jar#diffname.jar, 
udf2.jar, udf1.jar, another.jar
-          Assert.assertEquals("size 9 for " + Arrays.toString(cacheURIs), 9,
+          Assert.assertEquals("size 10 for " + Arrays.toString(cacheURIs), 10,
                   Arrays.asList(StringUtils.join(cacheURIs, 
",").split(",")).size());
-          Assert.assertEquals("size 9 for " + Arrays.toString(fileClassPaths), 
9,
+          Assert.assertEquals("size 10 for " + 
Arrays.toString(fileClassPaths), 10,
                   Arrays.asList(StringUtils.join(fileClassPaths, 
",").split(",")).size());
           // Count occurrences of the resources
           Map<String, Integer> occurrences = new HashMap<String, Integer>();
@@ -248,7 +248,7 @@ public class TestJobControlCompiler {
               val = (val == null) ? 1 : ++val;
               occurrences.put(cacheURI.toString(), val);
           }
-          Assert.assertEquals(9, occurrences.size());
+          Assert.assertEquals(10, occurrences.size());
 
           for (String file : occurrences.keySet()) {
               // check that only single occurrence even though we added once 
to dist cache (simulating via Oozie)


Reply via email to