Updated Branches: refs/heads/master 630163014 -> a6df0ccac
CRUNCH-211 Add one-to-many join functionality Project: http://git-wip-us.apache.org/repos/asf/crunch/repo Commit: http://git-wip-us.apache.org/repos/asf/crunch/commit/a6df0cca Tree: http://git-wip-us.apache.org/repos/asf/crunch/tree/a6df0cca Diff: http://git-wip-us.apache.org/repos/asf/crunch/diff/a6df0cca Branch: refs/heads/master Commit: a6df0ccac245fc74b2c8fb81253527100e7e4a80 Parents: 6301630 Author: Gabriel Reid <[email protected]> Authored: Sun Jun 2 17:16:46 2013 +0200 Committer: Gabriel Reid <[email protected]> Committed: Sun Jun 9 23:56:07 2013 +0200 ---------------------------------------------------------------------- .../crunch/lib/join/DefaultJoinStrategy.java | 2 +- .../apache/crunch/lib/join/LeftOuterJoinFn.java | 2 +- .../apache/crunch/lib/join/OneToManyJoin.java | 120 +++++++++++++++++++ .../crunch/lib/join/OneToManyJoinTest.java | 107 +++++++++++++++++ 4 files changed, 229 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/crunch/blob/a6df0cca/crunch-core/src/main/java/org/apache/crunch/lib/join/DefaultJoinStrategy.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/lib/join/DefaultJoinStrategy.java b/crunch-core/src/main/java/org/apache/crunch/lib/join/DefaultJoinStrategy.java index 87b9495..d1cf3db 100644 --- a/crunch-core/src/main/java/org/apache/crunch/lib/join/DefaultJoinStrategy.java +++ b/crunch-core/src/main/java/org/apache/crunch/lib/join/DefaultJoinStrategy.java @@ -59,7 +59,7 @@ public class DefaultJoinStrategy<K, U, V> implements JoinStrategy<K, U, V> { return grouped.parallelDo(joinFn.getJoinType() + grouped.getName(), joinFn, ret); } - private PGroupedTable<Pair<K, Integer>, Pair<U, V>> preJoin(PTable<K, U> left, PTable<K, V> right) { + static <K, U, V> PGroupedTable<Pair<K, Integer>, Pair<U, V>> preJoin(PTable<K, U> left, PTable<K, V> right) { PTypeFamily ptf = left.getTypeFamily(); PTableType<Pair<K, Integer>, Pair<U, V>> ptt = ptf.tableOf(ptf.pairs(left.getKeyType(), ptf.ints()), ptf.pairs(left.getValueType(), right.getValueType())); http://git-wip-us.apache.org/repos/asf/crunch/blob/a6df0cca/crunch-core/src/main/java/org/apache/crunch/lib/join/LeftOuterJoinFn.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/lib/join/LeftOuterJoinFn.java b/crunch-core/src/main/java/org/apache/crunch/lib/join/LeftOuterJoinFn.java index 731c496..be04115 100644 --- a/crunch-core/src/main/java/org/apache/crunch/lib/join/LeftOuterJoinFn.java +++ b/crunch-core/src/main/java/org/apache/crunch/lib/join/LeftOuterJoinFn.java @@ -37,7 +37,7 @@ public class LeftOuterJoinFn<K, U, V> extends JoinFn<K, U, V> { private transient int lastId; private transient K lastKey; private transient List<U> leftValues; - + public LeftOuterJoinFn(PType<K> keyType, PType<U> leftValueType) { super(keyType, leftValueType); } http://git-wip-us.apache.org/repos/asf/crunch/blob/a6df0cca/crunch-core/src/main/java/org/apache/crunch/lib/join/OneToManyJoin.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/lib/join/OneToManyJoin.java b/crunch-core/src/main/java/org/apache/crunch/lib/join/OneToManyJoin.java new file mode 100644 index 0000000..7e92738 --- /dev/null +++ b/crunch-core/src/main/java/org/apache/crunch/lib/join/OneToManyJoin.java @@ -0,0 +1,120 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.crunch.lib.join; + +import java.io.Serializable; + +import javax.annotation.Nullable; + +import org.apache.crunch.DoFn; +import org.apache.crunch.Emitter; +import org.apache.crunch.PCollection; +import org.apache.crunch.PGroupedTable; +import org.apache.crunch.PTable; +import org.apache.crunch.Pair; +import org.apache.crunch.lib.Join; +import org.apache.crunch.types.PType; + +import com.google.common.base.Function; +import com.google.common.collect.Iterables; + +/** + * Optimized join for situations where exactly one value is being joined with + * any other number of values based on a common key. + */ +public class OneToManyJoin { + + /** + * Performs a join on two tables, where the left table only contains a single + * value per key. + * <p> + * This method accepts a DoFn, which is responsible for converting the single + * left-side value and the iterable of right-side values into output values. + * <p> + * This method of joining is useful when there is a single context value that + * contains a large number of related values, and all related values must be + * brought together, with the quantity of the right-side values being too big + * to fit in memory. + * <p> + * If there are multiple values for the same key in the left-side table, only + * a single one will be used. + * + * @param left left-side table to join + * @param right right-side table to join + * @param postProcessFn DoFn to process the results of the join + * @param ptype type of the output of the postProcessFn + * @return the post-processed output of the join + */ + public static <K, U, V, T> PCollection<T> oneToManyJoin(PTable<K, U> left, PTable<K, V> right, + DoFn<Pair<U, Iterable<V>>, T> postProcessFn, PType<T> ptype) { + + PGroupedTable<Pair<K, Integer>, Pair<U, V>> grouped = DefaultJoinStrategy.preJoin(left, right); + return grouped.parallelDo("One to many join " + grouped.getName(), + new OneToManyJoinFn<K, U, V, T>(left.getValueType(), postProcessFn), ptype); + } + + /** + * Handles post-processing the output of {@link Join#oneToManyJoin}. + */ + static class OneToManyJoinFn<K, U, V, T> extends DoFn<Pair<Pair<K, Integer>, Iterable<Pair<U, V>>>, T> { + + private PType<U> leftValueType; + private DoFn<Pair<U, Iterable<V>>, T> postProcessFn; + private SecondElementFunction<U, V> secondElementFunction; + private K currentKey; + private U leftValue; + + public OneToManyJoinFn(PType<U> leftValueType, DoFn<Pair<U, Iterable<V>>, T> postProcessFn) { + this.leftValueType = leftValueType; + this.postProcessFn = postProcessFn; + this.secondElementFunction = new SecondElementFunction<U, V>(); + } + + @Override + public void initialize() { + super.initialize(); + postProcessFn.initialize(); + leftValueType.initialize(getConfiguration()); + } + + @Override + public void process(Pair<Pair<K, Integer>, Iterable<Pair<U, V>>> input, Emitter<T> emitter) { + Pair<K, Integer> keyPair = input.first(); + if (keyPair.second() == 0) { + leftValue = leftValueType.getDetachedValue(input.second().iterator().next().first()); + currentKey = input.first().first(); + } else if (keyPair.second() == 1 && input.first().first().equals(currentKey)) { + postProcessFn.process(Pair.of(leftValue, wrapIterable(input.second())), emitter); + leftValue = null; + } + } + + private Iterable<V> wrapIterable(Iterable<Pair<U, V>> input) { + return Iterables.transform(input, secondElementFunction); + } + + private static class SecondElementFunction<U, V> implements Function<Pair<U, V>, V>, Serializable { + + @Override + public V apply(@Nullable Pair<U, V> input) { + return input.second(); + } + + } + } +} http://git-wip-us.apache.org/repos/asf/crunch/blob/a6df0cca/crunch-core/src/test/java/org/apache/crunch/lib/join/OneToManyJoinTest.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/test/java/org/apache/crunch/lib/join/OneToManyJoinTest.java b/crunch-core/src/test/java/org/apache/crunch/lib/join/OneToManyJoinTest.java new file mode 100644 index 0000000..dad4c35 --- /dev/null +++ b/crunch-core/src/test/java/org/apache/crunch/lib/join/OneToManyJoinTest.java @@ -0,0 +1,107 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.crunch.lib.join; + +import static org.junit.Assert.assertEquals; + +import java.util.List; + +import org.apache.crunch.MapFn; +import org.apache.crunch.PCollection; +import org.apache.crunch.PTable; +import org.apache.crunch.Pair; +import org.apache.crunch.impl.mem.MemPipeline; +import org.apache.crunch.types.avro.Avros; +import org.junit.Test; + +import com.google.common.base.Joiner; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.Lists; + +public class OneToManyJoinTest { + + @Test + public void testOneToMany() { + PTable<Integer, String> left = MemPipeline.typedTableOf(Avros.tableOf(Avros.ints(), Avros.strings()), + 1, "one", 2, "two"); + PTable<Integer, String> right = MemPipeline.typedTableOf(Avros.tableOf(Avros.ints(), Avros.strings()), + 1, "1A", 1, "1B", 2, "2A", 2, "2B"); + + PCollection<Pair<String, String>> joined = OneToManyJoin.oneToManyJoin(left, right, new StringJoinFn(), + Avros.pairs(Avros.strings(), Avros.strings())); + + List<Pair<String, String>> expected = ImmutableList.of(Pair.of("one", "1A,1B"), Pair.of("two", "2A,2B")); + + assertEquals(expected, Lists.newArrayList(joined.materialize())); + + } + + @Test + public void testOneToMany_UnmatchedOnRightSide() { + PTable<Integer, String> left = MemPipeline.typedTableOf(Avros.tableOf(Avros.ints(), Avros.strings()), 1, "one", 2, + "two"); + PTable<Integer, String> right = MemPipeline.typedTableOf(Avros.tableOf(Avros.ints(), Avros.strings()), 2, "2A", 2, "2B"); + + PCollection<Pair<String, String>> joined = OneToManyJoin.oneToManyJoin(left, right, new StringJoinFn(), + Avros.pairs(Avros.strings(), Avros.strings())); + + List<Pair<String, String>> expected = ImmutableList.of(Pair.of("two", "2A,2B")); + + assertEquals(expected, Lists.newArrayList(joined.materialize())); + } + + @Test + public void testOneToMany_UnmatchedLeftSide() { + PTable<Integer, String> left = MemPipeline.typedTableOf(Avros.tableOf(Avros.ints(), Avros.strings()), 2, "two"); + PTable<Integer, String> right = MemPipeline.typedTableOf(Avros.tableOf(Avros.ints(), Avros.strings()), 1, "1A", 1, + "1B", 2, "2A", 2, "2B"); + + PCollection<Pair<String, String>> joined = OneToManyJoin.oneToManyJoin(left, right, new StringJoinFn(), + Avros.pairs(Avros.strings(), Avros.strings())); + + List<Pair<String, String>> expected = ImmutableList.of(Pair.of("two", "2A,2B")); + + assertEquals(expected, Lists.newArrayList(joined.materialize())); + } + + @Test + public void testOneToMany_MultipleValuesForSameKeyOnLeft() { + PTable<Integer, String> left = MemPipeline.typedTableOf(Avros.tableOf(Avros.ints(), Avros.strings()), + 1, "one", 2, "two", 1, "oneExtra"); + PTable<Integer, String> right = MemPipeline.typedTableOf(Avros.tableOf(Avros.ints(), Avros.strings()), + 1, "1A", 1, "1B", 2, "2A", 2, "2B"); + + PCollection<Pair<String, String>> joined = OneToManyJoin.oneToManyJoin(left, right, new StringJoinFn(), + Avros.pairs(Avros.strings(), Avros.strings())); + + List<Pair<String, String>> expected = ImmutableList.of(Pair.of("one", "1A,1B"), Pair.of("two", "2A,2B")); + + assertEquals(expected, Lists.newArrayList(joined.materialize())); + + } + + static class StringJoinFn extends MapFn<Pair<String, Iterable<String>>, Pair<String, String>> { + + @Override + public Pair<String, String> map(Pair<String, Iterable<String>> input) { + return Pair.of(input.first(), Joiner.on(',').join(input.second())); + } + + } + +}
