Repository: crunch Updated Branches: refs/heads/master fa04e3c7b -> 0d31415e4
CRUNCH-596 Support right-outer bloom join Signed-off-by: Gabriel Reid <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/crunch/repo Commit: http://git-wip-us.apache.org/repos/asf/crunch/commit/0d31415e Tree: http://git-wip-us.apache.org/repos/asf/crunch/tree/0d31415e Diff: http://git-wip-us.apache.org/repos/asf/crunch/diff/0d31415e Branch: refs/heads/master Commit: 0d31415e4a0d86e1d2e1648ad9da8dca49f5e985 Parents: fa04e3c Author: tworec <[email protected]> Authored: Fri Mar 4 18:58:01 2016 +0100 Committer: Gabriel Reid <[email protected]> Committed: Mon Mar 21 21:07:22 2016 +0100 ---------------------------------------------------------------------- .../lib/join/BloomFilterFullOuterJoinIT.java | 49 +++++++++++ .../lib/join/BloomFilterRightOuterJoinIT.java | 49 +++++++++++ .../java/org/apache/crunch/fn/ExtractKeyFn.java | 5 +- .../lib/join/BloomFilterJoinStrategy.java | 88 ++++++++++++++++---- 4 files changed, 174 insertions(+), 17 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/crunch/blob/0d31415e/crunch-core/src/it/java/org/apache/crunch/lib/join/BloomFilterFullOuterJoinIT.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/it/java/org/apache/crunch/lib/join/BloomFilterFullOuterJoinIT.java b/crunch-core/src/it/java/org/apache/crunch/lib/join/BloomFilterFullOuterJoinIT.java new file mode 100644 index 0000000..18a0100 --- /dev/null +++ b/crunch-core/src/it/java/org/apache/crunch/lib/join/BloomFilterFullOuterJoinIT.java @@ -0,0 +1,49 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.crunch.lib.join; + +import org.junit.AfterClass; +import org.junit.BeforeClass; + +public class BloomFilterFullOuterJoinIT extends AbstractFullOuterJoinIT { + + private static String saveTempDir; + + @BeforeClass + public static void setUpClass(){ + + // Ensure a consistent temporary directory for use of the DistributedCache. + + // The DistributedCache technically isn't supported when running in local mode, and the default + // temporary directiory "/tmp" is used as its location. This typically only causes an issue when + // running integration tests on Mac OS X, as OS X doesn't use "/tmp" as it's default temporary + // directory. The following call ensures that "/tmp" is used as the temporary directory on all platforms. + saveTempDir = System.setProperty("java.io.tmpdir", "/tmp"); + } + + @AfterClass + public static void tearDownClass(){ + System.setProperty("java.io.tmpdir", saveTempDir); + } + + @Override + protected <K, U, V> JoinStrategy<K, U, V> getJoinStrategy() { + return new BloomFilterJoinStrategy<K, U, V>(20000); + } + +} http://git-wip-us.apache.org/repos/asf/crunch/blob/0d31415e/crunch-core/src/it/java/org/apache/crunch/lib/join/BloomFilterRightOuterJoinIT.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/it/java/org/apache/crunch/lib/join/BloomFilterRightOuterJoinIT.java b/crunch-core/src/it/java/org/apache/crunch/lib/join/BloomFilterRightOuterJoinIT.java new file mode 100644 index 0000000..750bd35 --- /dev/null +++ b/crunch-core/src/it/java/org/apache/crunch/lib/join/BloomFilterRightOuterJoinIT.java @@ -0,0 +1,49 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.crunch.lib.join; + +import org.junit.AfterClass; +import org.junit.BeforeClass; + +public class BloomFilterRightOuterJoinIT extends AbstractRightOuterJoinIT { + + private static String saveTempDir; + + @BeforeClass + public static void setUpClass(){ + + // Ensure a consistent temporary directory for use of the DistributedCache. + + // The DistributedCache technically isn't supported when running in local mode, and the default + // temporary directiory "/tmp" is used as its location. This typically only causes an issue when + // running integration tests on Mac OS X, as OS X doesn't use "/tmp" as it's default temporary + // directory. The following call ensures that "/tmp" is used as the temporary directory on all platforms. + saveTempDir = System.setProperty("java.io.tmpdir", "/tmp"); + } + + @AfterClass + public static void tearDownClass(){ + System.setProperty("java.io.tmpdir", saveTempDir); + } + + @Override + protected <K, U, V> JoinStrategy<K, U, V> getJoinStrategy() { + return new BloomFilterJoinStrategy<K, U, V>(20000); + } + +} http://git-wip-us.apache.org/repos/asf/crunch/blob/0d31415e/crunch-core/src/main/java/org/apache/crunch/fn/ExtractKeyFn.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/fn/ExtractKeyFn.java b/crunch-core/src/main/java/org/apache/crunch/fn/ExtractKeyFn.java index 2d2776e..e88a5b1 100644 --- a/crunch-core/src/main/java/org/apache/crunch/fn/ExtractKeyFn.java +++ b/crunch-core/src/main/java/org/apache/crunch/fn/ExtractKeyFn.java @@ -23,8 +23,9 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.mapreduce.TaskInputOutputContext; /** - * Wrapper function for converting a {@code MapFn} into a key-value pair that is - * used to convert from a {@code PCollection<V>} to a {@code PTable<K, V>}. + * Wrapper function for converting a key-from-value extractor {@code MapFn<V, K>} into a + * key-value pair extractor that is used to convert from a {@code PCollection<V>} to a + * {@code PTable<K, V>}. */ public class ExtractKeyFn<K, V> extends MapFn<V, Pair<K, V>> { http://git-wip-us.apache.org/repos/asf/crunch/blob/0d31415e/crunch-core/src/main/java/org/apache/crunch/lib/join/BloomFilterJoinStrategy.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/lib/join/BloomFilterJoinStrategy.java b/crunch-core/src/main/java/org/apache/crunch/lib/join/BloomFilterJoinStrategy.java index 8fda17c..7ff7490 100644 --- a/crunch-core/src/main/java/org/apache/crunch/lib/join/BloomFilterJoinStrategy.java +++ b/crunch-core/src/main/java/org/apache/crunch/lib/join/BloomFilterJoinStrategy.java @@ -33,6 +33,9 @@ import org.apache.crunch.PTable; import org.apache.crunch.Pair; import org.apache.crunch.ParallelDoOptions; import org.apache.crunch.ReadableData; +import org.apache.crunch.fn.ExtractKeyFn; +import org.apache.crunch.fn.FilterFns; +import org.apache.crunch.fn.IdentityFn; import org.apache.crunch.types.PType; import org.apache.crunch.types.PTypeFamily; import org.apache.crunch.types.avro.AvroMode; @@ -57,6 +60,10 @@ import org.apache.hadoop.util.hash.Hash; * This strategy is useful in cases where the right-side table contains many keys that are not * present in the left-side table. In this case, the use of the Bloom filter avoids a * potentially costly shuffle phase for data that would never be joined to the left side. + * <p> + * Implementation Note: right and full outer join type are handled by splitting the right-side + * table (the bigger one) into two disjunctive streams: negatively filtered (right outer part) + * and positively filtered (passed to delegate strategy). */ public class BloomFilterJoinStrategy<K, U, V> implements JoinStrategy<K, U, V> { @@ -120,30 +127,42 @@ public class BloomFilterJoinStrategy<K, U, V> implements JoinStrategy<K, U, V> { @Override public PTable<K, Pair<U, V>> join(PTable<K, U> left, PTable<K, V> right, JoinType joinType) { - if (joinType != JoinType.INNER_JOIN && joinType != JoinType.LEFT_OUTER_JOIN) { - throw new IllegalStateException("JoinType " + joinType + " is not supported for BloomFilter joins"); - } - - PTable<K,V> filteredRightSide; PType<BloomFilter> bloomFilterType = getBloomFilterType(left.getTypeFamily()); PCollection<BloomFilter> bloomFilters = left.keys().parallelDo( "Create bloom filters", - new CreateBloomFilterFn(vectorSize, nbHash, left.getKeyType()), + new CreateBloomFilterFn<>(vectorSize, nbHash, left.getKeyType()), bloomFilterType); ReadableData<BloomFilter> bloomData = bloomFilters.asReadable(true); - FilterKeysWithBloomFilterFn<K, V> filterKeysFn = new FilterKeysWithBloomFilterFn<K, V>( - bloomData, - vectorSize, nbHash, - left.getKeyType()); + FilterKeysWithBloomFilterFn<K, V> filterKeysFn = new FilterKeysWithBloomFilterFn<>( + bloomData, vectorSize, nbHash, left.getKeyType()); + + if (joinType != JoinType.INNER_JOIN && joinType != JoinType.LEFT_OUTER_JOIN) { + right = right.parallelDo( + "disable deep copy", new DeepCopyDisablerFn<Pair<K, V>>(), right.getPTableType()); + } + + ParallelDoOptions options = ParallelDoOptions.builder() + .sourceTargets(bloomData.getSourceTargets()).build(); + PTable<K, V> filteredRightSide = right.parallelDo( + "Filter right-side with BloomFilters", + filterKeysFn, right.getPTableType(), options); - ParallelDoOptions.Builder optionsBuilder = ParallelDoOptions.builder(); - optionsBuilder.sourceTargets(bloomData.getSourceTargets()); + PTable<K, Pair<U, V>> leftJoinedWithFilteredRight = delegateJoinStrategy + .join(left, filteredRightSide, joinType); - filteredRightSide = right.parallelDo("Filter right side with BloomFilters", - filterKeysFn, right.getPTableType(), optionsBuilder.build()); + if (joinType == JoinType.INNER_JOIN || joinType == JoinType.LEFT_OUTER_JOIN) { + return leftJoinedWithFilteredRight; + } - return delegateJoinStrategy.join(left, filteredRightSide, joinType); + return leftJoinedWithFilteredRight.union( + right + .parallelDo( + "Negatively filter right-side with BloomFilters", + FilterFns.not(filterKeysFn), right.getPTableType(), options) + .mapValues( + "Right outer join: attach null as left-value", + new NullKeyFn<U, V>(), leftJoinedWithFilteredRight.getValueType())); } /** @@ -324,4 +343,43 @@ public class BloomFilterJoinStrategy<K, U, V> implements JoinStrategy<K, U, V> { } + /** + * Converts value into a null-value pair. It is used to convert negatively filtered + * right-side values into right outer join part. + */ + private static class NullKeyFn<K, V> extends ExtractKeyFn<K, V> { + public NullKeyFn() { + super(new MapFn<V, K>() { + @Override public K map(V input) { + return null; + } + + @Override public float scaleFactor() { + return 0.0001f; + } + }); + } + } + + /** + * Right and full outer join types are handled by splitting the right-side table (the bigger one) + * into two disjunctive streams: negatively filtered (right outer part) and positively filtered. + * To prevent concurrent modification Crunch performs a deep copy of such a splitted stream by + * default (see {@link DoFn#disableDeepCopy()}), which introduces an extra overhead. Since Bloom + * Filter directs every record to exactly one of these streams, making concurrent modification + * impossible, we can safely disable this feature. To achieve this we put the {@code right} PTable + * through a {@code parallelDo} call with this {@code DoFn}. + */ + private static class DeepCopyDisablerFn<T> extends MapFn<T, T> { + + @Override + public T map(T input) { + return input; + } + + @Override + public boolean disableDeepCopy() { + return true; + } + } }
