Updated Branches: refs/heads/master 72caac7d8 -> 1e06362b9
CRUNCH-121: Fix bug in full outer and left outer joins. Contributed by John Jensen. Signed-off-by: Josh Wills <[email protected]> Signed-off-by: Kiyan Ahmadizadeh <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/incubator-crunch/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-crunch/commit/1e06362b Tree: http://git-wip-us.apache.org/repos/asf/incubator-crunch/tree/1e06362b Diff: http://git-wip-us.apache.org/repos/asf/incubator-crunch/diff/1e06362b Branch: refs/heads/master Commit: 1e06362b9b8ff43d977d166fc4916e41cbeb3ae3 Parents: 72caac7 Author: John Jensen <[email protected]> Authored: Mon Nov 26 20:05:03 2012 -0800 Committer: Kiyan Ahmadizadeh <[email protected]> Committed: Tue Nov 27 16:38:14 2012 -0800 ---------------------------------------------------------------------- .../apache/crunch/lib/join/FullOuterJoinFn.java | 2 +- .../apache/crunch/lib/join/LeftOuterJoinFn.java | 2 +- .../lib/join/BrokenLeftAndOuterJoinTest.java | 90 +++++++++++++++ 3 files changed, 92 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/1e06362b/crunch/src/main/java/org/apache/crunch/lib/join/FullOuterJoinFn.java ---------------------------------------------------------------------- diff --git a/crunch/src/main/java/org/apache/crunch/lib/join/FullOuterJoinFn.java b/crunch/src/main/java/org/apache/crunch/lib/join/FullOuterJoinFn.java index 834396a..c0ce727 100644 --- a/crunch/src/main/java/org/apache/crunch/lib/join/FullOuterJoinFn.java +++ b/crunch/src/main/java/org/apache/crunch/lib/join/FullOuterJoinFn.java @@ -56,7 +56,7 @@ public class FullOuterJoinFn<K, U, V> extends JoinFn<K, U, V> { public void join(K key, int id, Iterable<Pair<U, V>> pairs, Emitter<Pair<K, Pair<U, V>>> emitter) { if (!key.equals(lastKey)) { // Make sure that left side gets emitted. - if (0 == lastId && 0 == id) { + if (0 == lastId) { for (U u : leftValues) { emitter.emit(Pair.of(lastKey, Pair.of(u, (V) null))); } http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/1e06362b/crunch/src/main/java/org/apache/crunch/lib/join/LeftOuterJoinFn.java ---------------------------------------------------------------------- diff --git a/crunch/src/main/java/org/apache/crunch/lib/join/LeftOuterJoinFn.java b/crunch/src/main/java/org/apache/crunch/lib/join/LeftOuterJoinFn.java index 18288a4..731c496 100644 --- a/crunch/src/main/java/org/apache/crunch/lib/join/LeftOuterJoinFn.java +++ b/crunch/src/main/java/org/apache/crunch/lib/join/LeftOuterJoinFn.java @@ -56,7 +56,7 @@ public class LeftOuterJoinFn<K, U, V> extends JoinFn<K, U, V> { public void join(K key, int id, Iterable<Pair<U, V>> pairs, Emitter<Pair<K, Pair<U, V>>> emitter) { if (!key.equals(lastKey)) { // Make sure that left side always gets emitted. - if (0 == lastId && 0 == id) { + if (0 == lastId) { for (U u : leftValues) { emitter.emit(Pair.of(lastKey, Pair.of(u, (V) null))); } http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/1e06362b/crunch/src/test/java/org/apache/crunch/lib/join/BrokenLeftAndOuterJoinTest.java ---------------------------------------------------------------------- diff --git a/crunch/src/test/java/org/apache/crunch/lib/join/BrokenLeftAndOuterJoinTest.java b/crunch/src/test/java/org/apache/crunch/lib/join/BrokenLeftAndOuterJoinTest.java new file mode 100644 index 0000000..7e2e444 --- /dev/null +++ b/crunch/src/test/java/org/apache/crunch/lib/join/BrokenLeftAndOuterJoinTest.java @@ -0,0 +1,90 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.crunch.lib.join; + +import static org.apache.crunch.test.StringWrapper.wrap; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyNoMoreInteractions; + +import java.util.List; + +import org.apache.crunch.Emitter; +import org.apache.crunch.Pair; +import org.apache.crunch.test.CrunchTestSupport; +import org.apache.crunch.test.StringWrapper; +import org.apache.crunch.types.avro.Avros; +import org.apache.hadoop.conf.Configuration; +import org.junit.Test; + +import com.google.common.collect.Lists; + +public class BrokenLeftAndOuterJoinTest { + + List<Pair<StringWrapper, String>> createValuePairList(StringWrapper leftValue, String rightValue) { + Pair<StringWrapper, String> valuePair = Pair.of(leftValue, rightValue); + List<Pair<StringWrapper, String>> valuePairList = Lists.newArrayList(); + valuePairList.add(valuePair); + return valuePairList; + } + + @Test + public void testOuterJoin() { + JoinFn<StringWrapper, StringWrapper, String> joinFn = new LeftOuterJoinFn<StringWrapper, StringWrapper, String>( + Avros.reflects(StringWrapper.class), + Avros.reflects(StringWrapper.class)); + joinFn.setContext(CrunchTestSupport.getTestContext(new Configuration())); + joinFn.initialize(); + Emitter<Pair<StringWrapper, Pair<StringWrapper, String>>> emitter = mock(Emitter.class); + + StringWrapper key = new StringWrapper(); + StringWrapper leftValue = new StringWrapper(); + key.setValue("left-only"); + leftValue.setValue("left-only-left"); + joinFn.join(key, 0, createValuePairList(leftValue, null), emitter); + + key.setValue("right-only"); + joinFn.join(key, 1, createValuePairList(null, "right-only-right"), emitter); + + verify(emitter).emit(Pair.of(wrap("left-only"), Pair.of(wrap("left-only-left"), (String) null))); + verifyNoMoreInteractions(emitter); + } + + @Test + public void testFullJoin() { + JoinFn<StringWrapper, StringWrapper, String> joinFn = new FullOuterJoinFn<StringWrapper, StringWrapper, String>( + Avros.reflects(StringWrapper.class), + Avros.reflects(StringWrapper.class)); + joinFn.setContext(CrunchTestSupport.getTestContext(new Configuration())); + joinFn.initialize(); + Emitter<Pair<StringWrapper, Pair<StringWrapper, String>>> emitter = mock(Emitter.class); + + StringWrapper key = new StringWrapper(); + StringWrapper leftValue = new StringWrapper(); + key.setValue("left-only"); + leftValue.setValue("left-only-left"); + joinFn.join(key, 0, createValuePairList(leftValue, null), emitter); + + key.setValue("right-only"); + joinFn.join(key, 1, createValuePairList(null, "right-only-right"), emitter); + + verify(emitter).emit(Pair.of(wrap("left-only"), Pair.of(wrap("left-only-left"), (String) null))); + verify(emitter).emit(Pair.of(wrap("right-only"), Pair.of((StringWrapper)null, "right-only-right"))); + verifyNoMoreInteractions(emitter); + } +}
