Repository: crunch Updated Branches: refs/heads/master cddba97c5 -> 06688d55e
CRUNCH-527 Use hash smearing for partitioning Apply a supplemental "smearing" function to hash codes when partitioning data. The UniformHashPartitioner (which applies this technique) is now the default partitioner. Project: http://git-wip-us.apache.org/repos/asf/crunch/repo Commit: http://git-wip-us.apache.org/repos/asf/crunch/commit/06688d55 Tree: http://git-wip-us.apache.org/repos/asf/crunch/tree/06688d55 Diff: http://git-wip-us.apache.org/repos/asf/crunch/diff/06688d55 Branch: refs/heads/master Commit: 06688d55ee306a52d243b10de017ebeec913577a Parents: cddba97 Author: Gabriel Reid <[email protected]> Authored: Tue May 26 09:42:44 2015 +0200 Committer: Gabriel Reid <[email protected]> Committed: Tue May 26 09:42:44 2015 +0200 ---------------------------------------------------------------------- .../java/org/apache/crunch/GroupingOptions.java | 3 +- .../impl/mr/run/UniformHashPartitioner.java | 34 ++++++++++++++ .../org/apache/crunch/lib/join/JoinUtils.java | 8 ++-- .../java/org/apache/crunch/util/HashUtil.java | 42 +++++++++++++++++ .../impl/mr/run/UniformHashPartitionerTest.java | 47 ++++++++++++++++++++ .../lib/AvroIndexedRecordPartitionerTest.java | 4 +- .../lib/TupleWritablePartitionerTest.java | 2 +- 7 files changed, 131 insertions(+), 9 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/crunch/blob/06688d55/crunch-core/src/main/java/org/apache/crunch/GroupingOptions.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/GroupingOptions.java b/crunch-core/src/main/java/org/apache/crunch/GroupingOptions.java index 59abe27..1616a46 100644 --- a/crunch-core/src/main/java/org/apache/crunch/GroupingOptions.java +++ b/crunch-core/src/main/java/org/apache/crunch/GroupingOptions.java @@ -23,6 +23,7 @@ import java.util.Collections; import java.util.Map; import java.util.Set; +import org.apache.crunch.impl.mr.run.UniformHashPartitioner; import org.apache.hadoop.io.RawComparator; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Partitioner; @@ -127,7 +128,7 @@ public class GroupingOptions implements Serializable { * */ public static class Builder { - private Class<? extends Partitioner> partitionerClass; + private Class<? extends Partitioner> partitionerClass = UniformHashPartitioner.class; private Class<? extends RawComparator> groupingComparatorClass; private Class<? extends RawComparator> sortComparatorClass; private boolean requireSortedKeys; http://git-wip-us.apache.org/repos/asf/crunch/blob/06688d55/crunch-core/src/main/java/org/apache/crunch/impl/mr/run/UniformHashPartitioner.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/mr/run/UniformHashPartitioner.java b/crunch-core/src/main/java/org/apache/crunch/impl/mr/run/UniformHashPartitioner.java new file mode 100644 index 0000000..c5b8459 --- /dev/null +++ b/crunch-core/src/main/java/org/apache/crunch/impl/mr/run/UniformHashPartitioner.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.crunch.impl.mr.run; + +import org.apache.crunch.util.HashUtil; +import org.apache.hadoop.mapreduce.Partitioner; + +/** + * Hash partitioner which applies a supplemental hashing function to the hash code of values to ensure better + * distribution of keys over partitions. + */ +public class UniformHashPartitioner<KEY, VALUE> extends Partitioner<KEY, VALUE> { + + @Override + public int getPartition(KEY key, VALUE value, int numPartitions) { + return ((HashUtil.smearHash(key.hashCode()) & Integer.MAX_VALUE) % numPartitions); + } +} http://git-wip-us.apache.org/repos/asf/crunch/blob/06688d55/crunch-core/src/main/java/org/apache/crunch/lib/join/JoinUtils.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/lib/join/JoinUtils.java b/crunch-core/src/main/java/org/apache/crunch/lib/join/JoinUtils.java index 02963a7..ba532f2 100644 --- a/crunch-core/src/main/java/org/apache/crunch/lib/join/JoinUtils.java +++ b/crunch-core/src/main/java/org/apache/crunch/lib/join/JoinUtils.java @@ -21,14 +21,12 @@ import org.apache.avro.Schema; import org.apache.avro.generic.IndexedRecord; import org.apache.avro.io.BinaryData; import org.apache.avro.mapred.AvroJob; -import org.apache.avro.mapred.AvroKey; -import org.apache.avro.mapred.AvroValue; import org.apache.avro.mapred.AvroWrapper; -import org.apache.avro.reflect.ReflectData; import org.apache.crunch.types.PTypeFamily; import org.apache.crunch.types.avro.AvroMode; import org.apache.crunch.types.writable.TupleWritable; import org.apache.crunch.types.writable.WritableTypeFamily; +import org.apache.crunch.util.HashUtil; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.io.DataInputBuffer; @@ -61,7 +59,7 @@ public class JoinUtils { public static class TupleWritablePartitioner extends Partitioner<TupleWritable, Writable> { @Override public int getPartition(TupleWritable key, Writable value, int numPartitions) { - return (key.get(0).hashCode() & Integer.MAX_VALUE) % numPartitions; + return (HashUtil.smearHash(key.get(0).hashCode()) & Integer.MAX_VALUE) % numPartitions; } } @@ -103,7 +101,7 @@ public class JoinUtils { } else { throw new UnsupportedOperationException("Unknown avro key type: " + key); } - return (record.get(0).hashCode() & Integer.MAX_VALUE) % numPartitions; + return (HashUtil.smearHash(record.get(0).hashCode()) & Integer.MAX_VALUE) % numPartitions; } } http://git-wip-us.apache.org/repos/asf/crunch/blob/06688d55/crunch-core/src/main/java/org/apache/crunch/util/HashUtil.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/util/HashUtil.java b/crunch-core/src/main/java/org/apache/crunch/util/HashUtil.java new file mode 100644 index 0000000..aed1348 --- /dev/null +++ b/crunch-core/src/main/java/org/apache/crunch/util/HashUtil.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.crunch.util; + +/** + * Utility methods for working with hash codes. + */ +public class HashUtil { + + /** + * Applies a supplemental hashing function to an integer, increasing variability in lower-order bits. + * This method is intended to avoid collisions in functions which rely on variance in the lower bits of a hash + * code (e.g. hash partitioning). + */ + // The following comments and code are taken directly from Guava's com.google.common.collect.Hashing class + // This method was written by Doug Lea with assistance from members of JCP + // JSR-166 Expert Group and released to the public domain, as explained at + // http://creativecommons.org/licenses/publicdomain + // + // As of 2010/06/11, this method is identical to the (package private) hash + // method in OpenJDK 7's java.util.HashMap class. + public static int smearHash(int hashCode) { + hashCode ^= (hashCode >>> 20) ^ (hashCode >>> 12); + return hashCode ^ (hashCode >>> 7) ^ (hashCode >>> 4); + } +} http://git-wip-us.apache.org/repos/asf/crunch/blob/06688d55/crunch-core/src/test/java/org/apache/crunch/impl/mr/run/UniformHashPartitionerTest.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/test/java/org/apache/crunch/impl/mr/run/UniformHashPartitionerTest.java b/crunch-core/src/test/java/org/apache/crunch/impl/mr/run/UniformHashPartitionerTest.java new file mode 100644 index 0000000..1518c1b --- /dev/null +++ b/crunch-core/src/test/java/org/apache/crunch/impl/mr/run/UniformHashPartitionerTest.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.crunch.impl.mr.run; + +import static org.hamcrest.Matchers.greaterThan; +import static org.junit.Assert.*; + +import com.google.common.collect.HashMultiset; +import com.google.common.collect.Multiset; +import org.junit.Test; + +public class UniformHashPartitionerTest { + + private static final UniformHashPartitioner INSTANCE = new UniformHashPartitioner(); + + // Simple test to ensure that the general idea behind this partitioner is working. + // We create 100 keys that have exactly the same lower-order bits, and partition them into 10 buckets, + // and then verify that every bucket got at least 20% of the keys. The default HashPartitioner would put + // everything in the same bucket. + @Test + public void testGetPartition() { + Multiset<Integer> partitionCounts = HashMultiset.create(); + final int NUM_PARTITIONS = 10; + for (int i = 0; i < 1000; i += 10) { + partitionCounts.add(INSTANCE.getPartition(i, i, NUM_PARTITIONS)); + } + for (int partitionNumber = 0; partitionNumber < NUM_PARTITIONS; partitionNumber++) { + assertThat(partitionCounts.count(partitionNumber), greaterThan(5)); + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/crunch/blob/06688d55/crunch-core/src/test/java/org/apache/crunch/lib/AvroIndexedRecordPartitionerTest.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/test/java/org/apache/crunch/lib/AvroIndexedRecordPartitionerTest.java b/crunch-core/src/test/java/org/apache/crunch/lib/AvroIndexedRecordPartitionerTest.java index ac8b89c..36bbd12 100644 --- a/crunch-core/src/test/java/org/apache/crunch/lib/AvroIndexedRecordPartitionerTest.java +++ b/crunch-core/src/test/java/org/apache/crunch/lib/AvroIndexedRecordPartitionerTest.java @@ -50,7 +50,7 @@ public class AvroIndexedRecordPartitionerTest { IndexedRecord indexedRecord = new MockIndexedRecord(-3); AvroKey<IndexedRecord> avroKey = new AvroKey<IndexedRecord>(indexedRecord); - assertEquals(0, avroPartitioner.getPartition(avroKey, new AvroValue<Object>(), 5)); + assertEquals(1, avroPartitioner.getPartition(avroKey, new AvroValue<Object>(), 5)); assertEquals(1, avroPartitioner.getPartition(avroKey, new AvroValue<Object>(), 2)); } @@ -59,7 +59,7 @@ public class AvroIndexedRecordPartitionerTest { IndexedRecord indexedRecord = new MockIndexedRecord(Integer.MIN_VALUE); AvroKey<IndexedRecord> avroKey = new AvroKey<IndexedRecord>(indexedRecord); - assertEquals(0, avroPartitioner.getPartition(avroKey, new AvroValue<Object>(), Integer.MAX_VALUE)); + assertEquals(151558288, avroPartitioner.getPartition(avroKey, new AvroValue<Object>(), Integer.MAX_VALUE)); } /** http://git-wip-us.apache.org/repos/asf/crunch/blob/06688d55/crunch-core/src/test/java/org/apache/crunch/lib/TupleWritablePartitionerTest.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/test/java/org/apache/crunch/lib/TupleWritablePartitionerTest.java b/crunch-core/src/test/java/org/apache/crunch/lib/TupleWritablePartitionerTest.java index 86c7437..6a3c0a8 100644 --- a/crunch-core/src/test/java/org/apache/crunch/lib/TupleWritablePartitionerTest.java +++ b/crunch-core/src/test/java/org/apache/crunch/lib/TupleWritablePartitionerTest.java @@ -43,7 +43,7 @@ public class TupleWritablePartitionerTest { IntWritable intWritable = new IntWritable(3); BytesWritable bw = new BytesWritable(WritableUtils.toByteArray(intWritable)); TupleWritable key = new TupleWritable(new Writable[] { bw }); - assertEquals(2, tupleWritableParitioner.getPartition(key, NullWritable.get(), 5)); + assertEquals(4, tupleWritableParitioner.getPartition(key, NullWritable.get(), 5)); assertEquals(0, tupleWritableParitioner.getPartition(key, NullWritable.get(), 2)); } }
