Author: rohini Date: Fri Oct 12 16:57:50 2018 New Revision: 1843689 URL: http://svn.apache.org/viewvc?rev=1843689&view=rev Log: PIG-5255: Improvements to bloom join (satishsaley via rohini)
Added: pig/trunk/shade/ pig/trunk/shade/roaringbitmap/ pig/trunk/shade/roaringbitmap/pom.xml pig/trunk/src/org/apache/pig/impl/bloom/ pig/trunk/src/org/apache/pig/impl/bloom/BloomFilter.java pig/trunk/src/org/apache/pig/impl/bloom/Hash.java pig/trunk/src/org/apache/pig/impl/bloom/HashFunction.java pig/trunk/src/org/apache/pig/impl/bloom/HashProvider.java pig/trunk/src/org/apache/pig/impl/bloom/JenkinsHash.java pig/trunk/src/org/apache/pig/impl/bloom/KirschMitzenmacherHash.java pig/trunk/src/org/apache/pig/impl/bloom/Murmur3Hash.java Modified: pig/trunk/CHANGES.txt pig/trunk/build.xml pig/trunk/ivy.xml pig/trunk/ivy/libraries.properties pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/BloomPackager.java pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POBloomFilterRearrangeTez.java pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POBuildBloomRearrangeTez.java pig/trunk/src/org/apache/pig/impl/util/JarManager.java pig/trunk/test/org/apache/pig/test/TestJobControlCompiler.java Modified: pig/trunk/CHANGES.txt URL: http://svn.apache.org/viewvc/pig/trunk/CHANGES.txt?rev=1843689&r1=1843688&r2=1843689&view=diff ============================================================================== --- pig/trunk/CHANGES.txt (original) +++ pig/trunk/CHANGES.txt Fri Oct 12 16:57:50 2018 @@ -26,6 +26,8 @@ PIG-5282: Upgade to Java 8 (satishsaley IMPROVEMENTS +PIG-5255: Improvements to bloom join (satishsaley via rohini) + PIG-5359: Reduce time spent in split serialization (satishsaley via rohini) PIG-5357: BagFactory interface should support creating a distinct bag from a set (jtolar via rohini) Modified: pig/trunk/build.xml URL: http://svn.apache.org/viewvc/pig/trunk/build.xml?rev=1843689&r1=1843688&r2=1843689&view=diff ============================================================================== --- pig/trunk/build.xml (original) +++ pig/trunk/build.xml Fri Oct 12 16:57:50 2018 @@ -399,6 +399,7 @@ <include name="joda-time-${joda-time.version}.jar"/> <include name="automaton-${automaton.version}.jar"/> <include name="jansi-${jansi.version}.jar"/> + <include name="RoaringBitmap-shaded-${roaring-bitmap-shaded.version}.jar"/> </patternset> </fileset> @@ -741,6 +742,7 @@ <fileset dir="${ivy.lib.dir}" includes="parser-core-${basjes-httpdlog-pigloader.version}.jar"/> <fileset dir="${ivy.lib.dir}" includes="ivy-*.jar"/> <fileset dir="${ivy.lib.dir}" includes="commons-logging-*.jar"/> + <fileset dir="${ivy.lib.dir}" includes="RoaringBitmap-shaded-${roaring-bitmap-shaded.version}.jar"/> </copy> </target> @@ -1716,6 +1718,10 @@ <ivy:retrieve settingsRef="${ant.project.name}.ivy.settings" log="${loglevel}" pattern="${ivy.lib.dir.spark}/[artifact]-[revision](-[classifier]).[ext]" conf="spark${sparkversion},hbase${hbaseversion}"/> <ivy:cachepath pathid="compile.classpath" conf="compile"/> + <exec dir="${basedir}/shade/roaringbitmap" executable="mvn"> + <arg line="clean package -Droaring.bitmap.version=${roaring-bitmap-shaded.version}"/> + </exec> + <copy file="${basedir}/shade/roaringbitmap/target/RoaringBitmap-shaded-${roaring-bitmap-shaded.version}.jar" todir="${ivy.lib.dir}"/> </target> <target name="ivy-test" depends="ivy-resolve" description="Retrieve Ivy-managed artifacts for test configuration"> Modified: pig/trunk/ivy.xml URL: http://svn.apache.org/viewvc/pig/trunk/ivy.xml?rev=1843689&r1=1843688&r2=1843689&view=diff ============================================================================== --- pig/trunk/ivy.xml (original) +++ pig/trunk/ivy.xml Fri Oct 12 16:57:50 2018 @@ -235,6 +235,9 @@ <dependency org="org.jruby" name="jruby-complete" rev="${jruby.version}" conf="compile->master"/> <dependency org="asm" name="asm" rev="${asm.version}" conf="compile->default"/> + <!-- Dependencies for bloom join --> + <dependency org="org.roaringbitmap" name="RoaringBitmap" rev="${roaring-bitmap-shaded.version}" conf="compile->master"/> + <!-- HBase dependency in format for releases higher or equal to 0.95 --> <dependency org="org.apache.hbase" name="hbase-client" rev="${hbase1.version}" conf="hbase1->master"> <artifact name="hbase-client" type="jar"/> Modified: pig/trunk/ivy/libraries.properties URL: http://svn.apache.org/viewvc/pig/trunk/ivy/libraries.properties?rev=1843689&r1=1843688&r2=1843689&view=diff ============================================================================== --- pig/trunk/ivy/libraries.properties (original) +++ pig/trunk/ivy/libraries.properties Fri Oct 12 16:57:50 2018 @@ -96,4 +96,5 @@ htrace.version=3.1.0-incubating htrace4.version=4.0.1-incubating commons-lang3.version=3.6 scala-xml.version=1.0.5 -glassfish.el.version=3.0.1-b08 \ No newline at end of file +glassfish.el.version=3.0.1-b08 +roaring-bitmap-shaded.version=0.7.14 \ No newline at end of file Added: pig/trunk/shade/roaringbitmap/pom.xml URL: http://svn.apache.org/viewvc/pig/trunk/shade/roaringbitmap/pom.xml?rev=1843689&view=auto ============================================================================== --- pig/trunk/shade/roaringbitmap/pom.xml (added) +++ pig/trunk/shade/roaringbitmap/pom.xml Fri Oct 12 16:57:50 2018 @@ -0,0 +1,80 @@ +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"> + + <modelVersion>4.0.0</modelVersion> + + <parent> + <groupId>org.apache</groupId> + <artifactId>apache</artifactId> + <version>14</version> + </parent> + + <groupId>org.apache.pig</groupId> + <artifactId>RoaringBitmap-shaded</artifactId> + <packaging>jar</packaging> + <version>${roaring.bitmap.version}</version> + + <name>Pig RoaringBitmap</name> + <url>http://pig.apache.org</url> + <prerequisites> + <maven>3.0</maven> + </prerequisites> + + <modules> + </modules> + + <properties> + <maven.shade.plugin.version>2.4.3</maven.shade.plugin.version> + <roaring.bitmap.version>0.7.14</roaring.bitmap.version> + </properties> + + <dependencies> + <dependency> + <groupId>org.roaringbitmap</groupId> + <artifactId>RoaringBitmap</artifactId> + <version>${roaring.bitmap.version}</version> + </dependency> + </dependencies> + + <build> + <plugins> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-shade-plugin</artifactId> + <version>${maven.shade.plugin.version}</version> + <executions> + <execution> + <id>shade-asm</id> + <phase>package</phase> + <goals> + <goal>shade</goal> + </goals> + <configuration> + <relocations> + <relocation> + <pattern>org.roaringbitmap</pattern> + <shadedPattern>org.apache.pig.org.roaringbitmap</shadedPattern> + </relocation> + </relocations> + </configuration> + </execution> + </executions> + </plugin> + </plugins> + </build> + +</project> Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/BloomPackager.java URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/BloomPackager.java?rev=1843689&r1=1843688&r2=1843689&view=diff ============================================================================== --- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/BloomPackager.java (original) +++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/BloomPackager.java Fri Oct 12 16:57:50 2018 @@ -22,17 +22,16 @@ import java.io.DataOutputStream; import java.io.IOException; import java.util.Iterator; -import org.apache.hadoop.util.bloom.BloomFilter; import org.apache.hadoop.util.bloom.Key; import org.apache.pig.backend.executionengine.ExecException; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.Packager; -import org.apache.pig.builtin.BuildBloomBase; import org.apache.pig.data.DataBag; import org.apache.pig.data.DataByteArray; import org.apache.pig.data.DataType; import org.apache.pig.data.Tuple; +import org.apache.pig.impl.bloom.BloomFilter; public class BloomPackager extends Packager { @@ -103,10 +102,10 @@ public class BloomPackager extends Packa Iterator<Tuple> iter = bags[0].iterator(); Tuple tup = iter.next(); DataByteArray bloomBytes = (DataByteArray) tup.get(0); - BloomFilter bloomFilter = BuildBloomBase.bloomIn(bloomBytes); + BloomFilter bloomFilter = BloomFilter.bloomIn(bloomBytes); while (iter.hasNext()) { tup = iter.next(); - bloomFilter.or(BuildBloomBase.bloomIn((DataByteArray) tup.get(0))); + bloomFilter.or(BloomFilter.bloomIn((DataByteArray) tup.get(0))); } Object partition = key; @@ -160,4 +159,4 @@ public class BloomPackager extends Packa public boolean isBloomCreatedInMap() { return bloomCreatedInMap; } -} \ No newline at end of file +} Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POBloomFilterRearrangeTez.java URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POBloomFilterRearrangeTez.java?rev=1843689&r1=1843688&r2=1843689&view=diff ============================================================================== --- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POBloomFilterRearrangeTez.java (original) +++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POBloomFilterRearrangeTez.java Fri Oct 12 16:57:50 2018 @@ -24,7 +24,6 @@ import java.util.Set; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.util.bloom.BloomFilter; import org.apache.hadoop.util.bloom.Key; import org.apache.pig.backend.executionengine.ExecException; import org.apache.pig.backend.hadoop.HDataType; @@ -32,12 +31,12 @@ import org.apache.pig.backend.hadoop.exe import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result; import org.apache.pig.backend.hadoop.executionengine.tez.runtime.ObjectCache; import org.apache.pig.backend.hadoop.executionengine.tez.runtime.TezInput; -import org.apache.pig.builtin.BuildBloomBase; import org.apache.pig.classification.InterfaceAudience; import org.apache.pig.classification.InterfaceStability; import org.apache.pig.data.DataByteArray; import org.apache.pig.data.DataType; import org.apache.pig.data.Tuple; +import org.apache.pig.impl.bloom.BloomFilter; import org.apache.pig.impl.io.NullableTuple; import org.apache.pig.impl.io.PigNullableWritable; import org.apache.tez.runtime.api.LogicalInput; @@ -106,7 +105,7 @@ public class POBloomFilterRearrangeTez e } Tuple val = (Tuple) reader.getCurrentValue(); int index = (int) val.get(0); - bloomFilters[index] = BuildBloomBase.bloomIn((DataByteArray) val.get(1)); + bloomFilters[index] = BloomFilter.bloomIn((DataByteArray) val.get(1)); } ObjectCache.getInstance().cache(cacheKey, bloomFilters); } catch (Exception e) { Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POBuildBloomRearrangeTez.java URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POBuildBloomRearrangeTez.java?rev=1843689&r1=1843688&r2=1843689&view=diff ============================================================================== --- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POBuildBloomRearrangeTez.java (original) +++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POBuildBloomRearrangeTez.java Fri Oct 12 16:57:50 2018 @@ -25,7 +25,6 @@ import java.util.Map; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.util.bloom.BloomFilter; import org.apache.hadoop.util.bloom.Key; import org.apache.pig.PigConfiguration; import org.apache.pig.backend.executionengine.ExecException; @@ -37,6 +36,7 @@ import org.apache.pig.classification.Int import org.apache.pig.data.DataByteArray; import org.apache.pig.data.DataType; import org.apache.pig.data.Tuple; +import org.apache.pig.impl.bloom.BloomFilter; import org.apache.pig.impl.io.NullableBytesWritable; import org.apache.pig.impl.io.NullableIntWritable; import org.apache.pig.impl.io.NullableTuple; Added: pig/trunk/src/org/apache/pig/impl/bloom/BloomFilter.java URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/impl/bloom/BloomFilter.java?rev=1843689&view=auto ============================================================================== --- pig/trunk/src/org/apache/pig/impl/bloom/BloomFilter.java (added) +++ pig/trunk/src/org/apache/pig/impl/bloom/BloomFilter.java Fri Oct 12 16:57:50 2018 @@ -0,0 +1,198 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.pig.impl.bloom; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.DataInput; +import java.io.DataInputStream; +import java.io.DataOutput; +import java.io.DataOutputStream; +import java.io.IOException; +import java.util.Arrays; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.compress.BZip2Codec; +import org.apache.hadoop.io.compress.CompressionInputStream; +import org.apache.hadoop.io.compress.CompressionOutputStream; +import org.apache.hadoop.util.bloom.Filter; +import org.apache.hadoop.util.bloom.Key; +import org.apache.pig.data.DataByteArray; +import org.apache.pig.org.roaringbitmap.RoaringBitmap; + +public class BloomFilter extends Filter { + private static final Log LOG = LogFactory.getLog(BloomFilter.class); + private static final int VERSION = 1; + private HashFunction hash; + private RoaringBitmap bitmap; + private int hashAlgorithm; + + /** + * Read the fields using bloomIn + */ + public BloomFilter() { + super(); + } + + public BloomFilter(int vectorSize, int nbHash, int hashAlgorithm) { + super.vectorSize = vectorSize; + super.nbHash = nbHash; + this.hashAlgorithm = hashAlgorithm; + this.hash = new HashFunction(vectorSize, nbHash, hashAlgorithm); + this.bitmap = new RoaringBitmap(); + } + + @Override + public void add(Key key) { + if(key == null) { + throw new NullPointerException("key cannot be null"); + } + int[] h = hash.hash(key); + hash.clear(); + Arrays.sort(h); + this.bitmap.or(RoaringBitmap.bitmapOf(h)); + } + + @Override + public boolean membershipTest(Key key) { + if(key == null) { + throw new NullPointerException("key cannot be null"); + } + int[] h = hash.hash(key); + hash.clear(); + for(int i = 0; i < nbHash; i++) { + if(!this.bitmap.contains(h[i])) { + return false; + } + } + return true; + } + + @Override + public void and(Filter filter) { + if(filter == null + || !(filter instanceof BloomFilter) + || ((BloomFilter)filter).vectorSize != this.vectorSize + || ((BloomFilter)filter).nbHash != this.nbHash) { + throw new IllegalArgumentException("filters cannot be and-ed"); + } + this.bitmap.and(((BloomFilter)filter).bitmap); + } + + @Override + public void or(Filter filter) { + if(filter == null + || !(filter instanceof BloomFilter) + || ((BloomFilter)filter).vectorSize != this.vectorSize + || ((BloomFilter)filter).nbHash != this.nbHash) { + throw new IllegalArgumentException("filters cannot be or-ed"); + } + this.bitmap.or(((BloomFilter)filter).bitmap); + } + + @Override + public void xor(Filter filter) { + if(filter == null + || !(filter instanceof BloomFilter) + || ((BloomFilter)filter).vectorSize != this.vectorSize + || ((BloomFilter)filter).nbHash != this.nbHash) { + throw new IllegalArgumentException("filters cannot be xor-ed"); + } + this.bitmap.xor(((BloomFilter)filter).bitmap); + } + + @Override + public void not() { + this.bitmap.flip(0, vectorSize); + } + + @Override + public String toString() { + return this.bitmap.toString(); + } + + /** + * @return size of the the bloomfilter + */ + public int getVectorSize() { + return this.vectorSize; + } + + @Override + public void write(DataOutput out) throws IOException { + out.writeInt(VERSION); + out.writeInt(this.nbHash); + out.writeByte(this.hashAlgorithm); + out.writeInt(this.vectorSize); + this.bitmap.runOptimize(); + ByteArrayOutputStream bos = compressBitmap(); + LOG.info("Compressed bitmap from " + String.format("%,8d bytes", this.bitmap.getSizeInBytes()) + + " to "+ String.format("%,8d bytes", bos.size())); + out.writeInt(bos.size()); + out.write(bos.toByteArray()); + } + + @Override + public void readFields(DataInput in) throws IOException { + int ver = in.readInt(); + if (ver == VERSION) { + this.nbHash = in.readInt(); + this.hashAlgorithm = in.readByte(); + } else { + throw new IOException("Unsupported version: " + ver); + } + this.vectorSize = in.readInt(); + this.hash = new HashFunction(this.vectorSize, this.nbHash, this.hashAlgorithm); + this.bitmap = new RoaringBitmap(); + int compressedSize = in.readInt(); + byte[] buf = new byte[compressedSize]; + in.readFully(buf); + this.bitmap.deserialize(decompressBitmap(buf)); + } + + public static BloomFilter bloomIn(DataByteArray b) throws IOException { + DataInputStream dis = new DataInputStream(new + ByteArrayInputStream(b.get())); + BloomFilter f = new BloomFilter(); + f.readFields(dis); + return f; + } + private ByteArrayOutputStream compressBitmap() throws IOException { + ByteArrayOutputStream bos = new ByteArrayOutputStream(); + BZip2Codec bzip = new BZip2Codec(); + bzip.setConf(new Configuration(false)); + CompressionOutputStream compressionOut = bzip.createOutputStream(bos); + DataOutputStream dos = new DataOutputStream(compressionOut); + this.bitmap.serialize(dos); + compressionOut.finish(); + dos.flush(); + return bos; + } + + private DataInput decompressBitmap(byte[] buffer) throws IOException { + ByteArrayInputStream deCompressedDataBuffer = new ByteArrayInputStream(buffer, 0, buffer.length); + BZip2Codec bzip = new BZip2Codec(); + bzip.setConf(new Configuration(false)); + CompressionInputStream compressionIn = bzip.createInputStream(deCompressedDataBuffer); + DataInputStream inflateIn = new DataInputStream(compressionIn); + return inflateIn; + } +} Added: pig/trunk/src/org/apache/pig/impl/bloom/Hash.java URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/impl/bloom/Hash.java?rev=1843689&view=auto ============================================================================== --- pig/trunk/src/org/apache/pig/impl/bloom/Hash.java (added) +++ pig/trunk/src/org/apache/pig/impl/bloom/Hash.java Fri Oct 12 16:57:50 2018 @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.pig.impl.bloom; + +public abstract class Hash { + public static final int MURMUR = 1; + public static final int MURMUR3 = MURMUR; + public static final int MURMUR3KM = 2; + public static final int JENKINS = 3; + + public static int parseHashType(String hashType) { + if("murmur".equalsIgnoreCase(hashType)) { + return MURMUR3; + } else if("murmur3km".equalsIgnoreCase(hashType)) { + return MURMUR3KM; + } else if("jenkins".equalsIgnoreCase(hashType)) { + return JENKINS; + } + throw new IllegalArgumentException("Hash Algorithm values must be one of - murmur, murmur3km, jenkins"); + } + + public static Hash getInstance(int hashType) { + switch (hashType) { + case MURMUR: + return new Murmur3Hash(); + case MURMUR3KM: + return new KirschMitzenmacherHash(); + case JENKINS: + return new JenkinsHash(); + } + throw new IllegalArgumentException("Hash type values must be one of - 1 (murmur), 2 (murmur3km), 3 (jenkins)"); + } + + /** + * @param bytes + * @param maxValue The maximum hashed value + * @param nbHash The number of hashed values + * @return + */ + public abstract int[] hash(byte[] bytes, int maxValue, int nbHash); + +} Added: pig/trunk/src/org/apache/pig/impl/bloom/HashFunction.java URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/impl/bloom/HashFunction.java?rev=1843689&view=auto ============================================================================== --- pig/trunk/src/org/apache/pig/impl/bloom/HashFunction.java (added) +++ pig/trunk/src/org/apache/pig/impl/bloom/HashFunction.java Fri Oct 12 16:57:50 2018 @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.pig.impl.bloom; + +import org.apache.hadoop.util.bloom.Key; + +public class HashFunction { + /** The number of hashed values. */ + protected int nbHash; + /** The maximum highest returned value. */ + protected int maxValue; + /** Hashing algorithm to use. */ + protected Hash hashAlgorithm; + /** + * Constructor. + * <p> + * Builds a hash function that must obey to a given maximum number of returned values and a highest value. + * @param maxValue The maximum highest returned value. + * @param nbHash The number of resulting hashed values. + * @param hashAlgorithm type of the hashing algorithm (see {@link Hash}). + */ + public HashFunction(int maxValue, int nbHash, int hashAlgorithm) { + if (maxValue <= 0) { + throw new IllegalArgumentException("maxValue must be > 0"); + } + if (nbHash <= 0) { + throw new IllegalArgumentException("nbHash must be > 0"); + } + + this.maxValue = maxValue; + this.nbHash = nbHash; + this.hashAlgorithm = Hash.getInstance(hashAlgorithm); + } + + /** Clears <i>this</i> hash function. A NOOP */ + public void clear() { + } + + /** + * Hashes a specified key into several integers. + * @param k The specified key. + * @return The array of hashed values. + */ + public int[] hash(Key k){ + byte[] b = k.getBytes(); + if (b == null) { + throw new NullPointerException("buffer reference is null"); + } + if (b.length == 0) { + throw new IllegalArgumentException("key length must be > 0"); + } + return hashAlgorithm.hash(b, maxValue, nbHash); + } +} \ No newline at end of file Added: pig/trunk/src/org/apache/pig/impl/bloom/HashProvider.java URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/impl/bloom/HashProvider.java?rev=1843689&view=auto ============================================================================== --- pig/trunk/src/org/apache/pig/impl/bloom/HashProvider.java (added) +++ pig/trunk/src/org/apache/pig/impl/bloom/HashProvider.java Fri Oct 12 16:57:50 2018 @@ -0,0 +1,149 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +/* + * Copyright Felix Gessert and Florian Bücklers. All rights reserved. Permission is hereby granted, + * free of charge, to any person obtaining a copy of this software and associated documentation files + * (the "Software"), to deal in the Software without restriction, including without limitation the + * rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the + * Software, and to permit persons to whom the Software is furnished to do so, subject to the following + * conditions: + * The above copyright notice and this permission notice shall be included in all copies or substantial + * portions of the Software. + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT + * NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. + * IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, + * WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE + * SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. + */ +package org.apache.pig.impl.bloom; + +import java.util.function.BiFunction; + +/** + * Taken from https://github.com/Baqend/Orestes-Bloomfilter + */ +public class HashProvider { + + public static int[] hashMurmur3(byte[] value, int m, int k) { + return rejectionSample(HashProvider::murmur3_signed, value, m, k); + } + + public static int[] hashKirschMitzenmacher(byte[] value, int m, int k) { + int[] result = new int[k]; + long hash1 = murmur3(0, value); + long hash2 = murmur3((int) hash1, value); + for (int i = 0; i < k; i++) { + result[i] = (int) ((hash1 + i * hash2) % m); + } + return result; + } + + private static long murmur3(int seed, byte[] bytes) { + return Integer.toUnsignedLong(murmur3_signed(seed, bytes)); + } + + private static int murmur3_signed(int seed, byte[] bytes) { + int h1 = seed; + //Standard in Guava + int c1 = 0xcc9e2d51; + int c2 = 0x1b873593; + int len = bytes.length; + int i = 0; + + while (len >= 4) { + //process() + int k1 = (bytes[i++] & 0xFF); + k1 |= (bytes[i++] & 0xFF) << 8; + k1 |= (bytes[i++] & 0xFF) << 16; + k1 |= (bytes[i++] & 0xFF) << 24; + + k1 *= c1; + k1 = Integer.rotateLeft(k1, 15); + k1 *= c2; + + h1 ^= k1; + h1 = Integer.rotateLeft(h1, 13); + h1 = h1 * 5 + 0xe6546b64; + + len -= 4; + } + + //processingRemaining() + int k1 = 0; + switch (len) { + case 3: + k1 ^= (bytes[i + 2] & 0xFF) << 16; + // fall through + case 2: + k1 ^= (bytes[i + 1] & 0xFF) << 8; + // fall through + case 1: + k1 ^= (bytes[i] & 0xFF); + + k1 *= c1; + k1 = Integer.rotateLeft(k1, 15); + k1 *= c2; + h1 ^= k1; + } + i += len; + + //makeHash() + h1 ^= i; + + h1 ^= h1 >>> 16; + h1 *= 0x85ebca6b; + h1 ^= h1 >>> 13; + h1 *= 0xc2b2ae35; + h1 ^= h1 >>> 16; + + return h1; + } + + /** + * Performs rejection sampling on a random 32bit Java int (sampled from Integer.MIN_VALUE to Integer.MAX_VALUE). + * + * @param random int + * @param m integer output range [1,size] + * @return the number down-sampled to interval [0, size]. Or -1 if it has to be rejected. + */ + private static int rejectionSample(int random, int m) { + random = Math.abs(random); + if (random > (2147483647 - 2147483647 % m) + || random == Integer.MIN_VALUE) + return -1; + else + return random % m; + } + + private static int[] rejectionSample(BiFunction<Integer, byte[], Integer> hashFunction, byte[] value, int m, int k) { + int[] hashes = new int[k]; + int seed = 0; + int pos = 0; + while (pos < k) { + seed = hashFunction.apply(seed, value); + int hash = rejectionSample(seed, m); + if (hash != -1) { + hashes[pos++] = hash; + } + } + return hashes; + } + +} Added: pig/trunk/src/org/apache/pig/impl/bloom/JenkinsHash.java URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/impl/bloom/JenkinsHash.java?rev=1843689&view=auto ============================================================================== --- pig/trunk/src/org/apache/pig/impl/bloom/JenkinsHash.java (added) +++ pig/trunk/src/org/apache/pig/impl/bloom/JenkinsHash.java Fri Oct 12 16:57:50 2018 @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.pig.impl.bloom; + +public class JenkinsHash extends Hash { + + private org.apache.hadoop.util.hash.Hash hashFunction = org.apache.hadoop.util.hash.JenkinsHash.getInstance(); + + @Override + public int[] hash(byte[] bytes, int maxValue, int nbHash) { + int[] result = new int[nbHash]; + for (int i = 0, initval = 0; i < nbHash; i++) { + initval = hashFunction.hash(bytes, initval); + result[i] = Math.abs(initval % maxValue); + } + return result; + } + +} Added: pig/trunk/src/org/apache/pig/impl/bloom/KirschMitzenmacherHash.java URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/impl/bloom/KirschMitzenmacherHash.java?rev=1843689&view=auto ============================================================================== --- pig/trunk/src/org/apache/pig/impl/bloom/KirschMitzenmacherHash.java (added) +++ pig/trunk/src/org/apache/pig/impl/bloom/KirschMitzenmacherHash.java Fri Oct 12 16:57:50 2018 @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.pig.impl.bloom; + +public class KirschMitzenmacherHash extends Hash { + + @Override + public int[] hash(byte[] bytes, int maxValue, int numHash) { + return HashProvider.hashKirschMitzenmacher(bytes, maxValue , numHash); + } + +} Added: pig/trunk/src/org/apache/pig/impl/bloom/Murmur3Hash.java URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/impl/bloom/Murmur3Hash.java?rev=1843689&view=auto ============================================================================== --- pig/trunk/src/org/apache/pig/impl/bloom/Murmur3Hash.java (added) +++ pig/trunk/src/org/apache/pig/impl/bloom/Murmur3Hash.java Fri Oct 12 16:57:50 2018 @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.pig.impl.bloom; + +public class Murmur3Hash extends Hash { + @Override + public int[] hash(byte[] bytes, int maxValue, int numHash) { + return HashProvider.hashMurmur3(bytes, maxValue, numHash); + } +} Modified: pig/trunk/src/org/apache/pig/impl/util/JarManager.java URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/impl/util/JarManager.java?rev=1843689&r1=1843688&r2=1843689&view=diff ============================================================================== --- pig/trunk/src/org/apache/pig/impl/util/JarManager.java (original) +++ pig/trunk/src/org/apache/pig/impl/util/JarManager.java Fri Oct 12 16:57:50 2018 @@ -48,6 +48,7 @@ import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.util.StringUtils; import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapReduce; import org.apache.pig.impl.PigContext; +import org.apache.pig.org.roaringbitmap.RoaringBitmap; import org.apache.tools.bzip2r.BZip2Constants; import org.joda.time.DateTime; @@ -66,7 +67,8 @@ public class JarManager { BZIP2R(BZip2Constants.class), AUTOMATON(Automaton.class), ANTLR(CommonTokenStream.class), - JODATIME(DateTime.class); + JODATIME(DateTime.class), + SHADED_ROARING_BITMAP(RoaringBitmap.class); private final Class pkgClass; Modified: pig/trunk/test/org/apache/pig/test/TestJobControlCompiler.java URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestJobControlCompiler.java?rev=1843689&r1=1843688&r2=1843689&view=diff ============================================================================== --- pig/trunk/test/org/apache/pig/test/TestJobControlCompiler.java (original) +++ pig/trunk/test/org/apache/pig/test/TestJobControlCompiler.java Fri Oct 12 16:57:50 2018 @@ -130,7 +130,7 @@ public class TestJobControlCompiler { // verifying the jar gets on distributed cache Path[] fileClassPaths = DistributedCache.getFileClassPaths(jobConf); // guava jar is not shipped with Hadoop 2.x - Assert.assertEquals("size for "+Arrays.toString(fileClassPaths), 5, fileClassPaths.length); + Assert.assertEquals("size for "+Arrays.toString(fileClassPaths), 6, fileClassPaths.length); Path distributedCachePath = fileClassPaths[0]; Assert.assertEquals("ends with jar name: "+distributedCachePath, distributedCachePath.getName(), tmpFile.getName()); // hadoop bug requires path to not contain hdfs://hotname in front @@ -234,11 +234,11 @@ public class TestJobControlCompiler { // 4. another.jar and 5. udf1.jar, and not duplicate udf.jar System.out.println("cache.files= " + Arrays.toString(cacheURIs)); System.out.println("classpath.files= " + Arrays.toString(fileClassPaths)); - // Default jars - 5 (pig, antlr, joda-time, automaton) + // Default jars - 5 (pig, antlr, joda-time, automaton, roaring-bitmap-shaded) // Other jars - 10 (udf.jar#udf.jar, udf1.jar#diffname.jar, udf2.jar, udf1.jar, another.jar - Assert.assertEquals("size 9 for " + Arrays.toString(cacheURIs), 9, + Assert.assertEquals("size 10 for " + Arrays.toString(cacheURIs), 10, Arrays.asList(StringUtils.join(cacheURIs, ",").split(",")).size()); - Assert.assertEquals("size 9 for " + Arrays.toString(fileClassPaths), 9, + Assert.assertEquals("size 10 for " + Arrays.toString(fileClassPaths), 10, Arrays.asList(StringUtils.join(fileClassPaths, ",").split(",")).size()); // Count occurrences of the resources Map<String, Integer> occurrences = new HashMap<String, Integer>(); @@ -248,7 +248,7 @@ public class TestJobControlCompiler { val = (val == null) ? 1 : ++val; occurrences.put(cacheURI.toString(), val); } - Assert.assertEquals(9, occurrences.size()); + Assert.assertEquals(10, occurrences.size()); for (String file : occurrences.keySet()) { // check that only single occurrence even though we added once to dist cache (simulating via Oozie)