Updated Branches: refs/heads/master 4683a8d49 -> 154c8fbd3
CRUNCH-189: Clean up Union integration test cases. Unify PTableUnionIT and UnionGbkIT into UnionUT. Rewrite tests for readability and to verify results. Remove dependencies to Gutenberg-licensed files. Project: http://git-wip-us.apache.org/repos/asf/crunch/repo Commit: http://git-wip-us.apache.org/repos/asf/crunch/commit/154c8fbd Tree: http://git-wip-us.apache.org/repos/asf/crunch/tree/154c8fbd Diff: http://git-wip-us.apache.org/repos/asf/crunch/diff/154c8fbd Branch: refs/heads/master Commit: 154c8fbd3c345e06fdb392a0a5b83f1fba5c7b4f Parents: 4683a8d Author: Matthias Friedrich <[email protected]> Authored: Fri Mar 29 21:03:56 2013 +0100 Committer: Matthias Friedrich <[email protected]> Committed: Mon Apr 1 20:17:54 2013 +0200 ---------------------------------------------------------------------- .../java/org/apache/crunch/test/TemporaryPath.java | 3 +- .../it/java/org/apache/crunch/PTableUnionIT.java | 99 ----------- .../src/it/java/org/apache/crunch/UnionGbkIT.java | 117 ------------- crunch/src/it/java/org/apache/crunch/UnionIT.java | 136 +++++++++++++++ .../src/it/java/org/apache/crunch/test/Tests.java | 20 ++- .../org/apache/crunch/UnionITData/src1.txt | 5 + .../org/apache/crunch/UnionITData/src2.txt | 3 + 7 files changed, 164 insertions(+), 219 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/crunch/blob/154c8fbd/crunch-test/src/main/java/org/apache/crunch/test/TemporaryPath.java ---------------------------------------------------------------------- diff --git a/crunch-test/src/main/java/org/apache/crunch/test/TemporaryPath.java b/crunch-test/src/main/java/org/apache/crunch/test/TemporaryPath.java index a4b07b9..1721eca 100644 --- a/crunch-test/src/main/java/org/apache/crunch/test/TemporaryPath.java +++ b/crunch-test/src/main/java/org/apache/crunch/test/TemporaryPath.java @@ -110,7 +110,8 @@ public final class TemporaryPath extends ExternalResource { * Copy a classpath resource to {@link File}. */ public File copyResourceFile(String resourceName) throws IOException { - File dest = new File(tmp.getRoot(), resourceName); + String baseName = new File(resourceName).getName(); + File dest = new File(tmp.getRoot(), baseName); copy(resourceName, dest); return dest; } http://git-wip-us.apache.org/repos/asf/crunch/blob/154c8fbd/crunch/src/it/java/org/apache/crunch/PTableUnionIT.java ---------------------------------------------------------------------- diff --git a/crunch/src/it/java/org/apache/crunch/PTableUnionIT.java b/crunch/src/it/java/org/apache/crunch/PTableUnionIT.java deleted file mode 100644 index 94c548f..0000000 --- a/crunch/src/it/java/org/apache/crunch/PTableUnionIT.java +++ /dev/null @@ -1,99 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.crunch; - -import static org.junit.Assert.assertNotNull; - -import org.apache.crunch.PCollection; -import org.apache.crunch.PTable; -import org.apache.crunch.fn.IdentityFn; -import org.apache.crunch.impl.mr.MRPipeline; -import org.apache.crunch.test.TemporaryPath; -import org.apache.crunch.test.TemporaryPaths; -import org.apache.crunch.types.avro.Avros; -import org.junit.After; -import org.junit.Before; -import org.junit.Rule; -import org.junit.Test; - - - -public class PTableUnionIT { - - public static class FirstLetterKeyFn extends DoFn<String, Pair<String, String>> { - - private static final long serialVersionUID = 5517897875971194220L; - - @Override - public void process(String input, Emitter<Pair<String, String>> emitter) { - if (input.length() > 0) { - emitter.emit(Pair.of(input.substring(0, 1), input)); - } - } - } - - @Rule - public TemporaryPath tmpDir = TemporaryPaths.create(); - - protected MRPipeline pipeline; - - @Before - public void setUp() { - pipeline = new MRPipeline(this.getClass(), tmpDir.getDefaultConfiguration()); - } - - @After - public void tearDown() { - pipeline.done(); - } - - @Test - public void tableUnionMaterializeNPE() throws Exception { - PCollection<String> words = pipeline.readTextFile(tmpDir.copyResourceFileName("shakes.txt")); - PCollection<String> lorum = pipeline.readTextFile(tmpDir.copyResourceFileName("maugham.txt")); - lorum.materialize(); - - PTable<String, String> wordsByFirstLetter = - words.parallelDo("byFirstLetter", new FirstLetterKeyFn(), Avros.tableOf(Avros.strings(), Avros.strings())); - PTable<String, String> lorumByFirstLetter = - lorum.parallelDo("byFirstLetter", new FirstLetterKeyFn(), Avros.tableOf(Avros.strings(), Avros.strings())); - - @SuppressWarnings("unchecked") - PTable<String, String> union = wordsByFirstLetter.union(lorumByFirstLetter); - - assertNotNull(union.materialize().iterator().next()); - } - - @Test - public void collectionUnionMaterializeNPE() throws Exception { - PCollection<String> words = pipeline.readTextFile(tmpDir.copyResourceFileName("shakes.txt")); - PCollection<String> lorum = pipeline.readTextFile(tmpDir.copyResourceFileName("maugham.txt")); - lorum.materialize(); - - IdentityFn<String> identity = IdentityFn.getInstance(); - words = words.parallelDo(identity, Avros.strings()); - lorum = lorum.parallelDo(identity, Avros.strings()); - - @SuppressWarnings("unchecked") - PCollection<String> union = words.union(lorum); - - union.materialize().iterator(); - - assertNotNull(union.materialize().iterator().next()); - } -} http://git-wip-us.apache.org/repos/asf/crunch/blob/154c8fbd/crunch/src/it/java/org/apache/crunch/UnionGbkIT.java ---------------------------------------------------------------------- diff --git a/crunch/src/it/java/org/apache/crunch/UnionGbkIT.java b/crunch/src/it/java/org/apache/crunch/UnionGbkIT.java deleted file mode 100644 index 3937fe8..0000000 --- a/crunch/src/it/java/org/apache/crunch/UnionGbkIT.java +++ /dev/null @@ -1,117 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.crunch; - -import static org.junit.Assert.assertNotNull; - -import org.apache.crunch.PCollection; -import org.apache.crunch.PGroupedTable; -import org.apache.crunch.PTable; -import org.apache.crunch.impl.mr.MRPipeline; -import org.apache.crunch.test.TemporaryPath; -import org.apache.crunch.test.TemporaryPaths; -import org.apache.crunch.types.avro.Avros; -import org.junit.After; -import org.junit.Before; -import org.junit.Rule; -import org.junit.Test; - -public class UnionGbkIT { - - @Rule - public TemporaryPath tmpDir = TemporaryPaths.create(); - - MRPipeline pipeline; - - public static class FirstLetterKeyFn extends DoFn<String, Pair<String, String>> { - @Override - public void process(String input, Emitter<Pair<String, String>> emitter) { - if (input.length() > 0) { - emitter.emit(Pair.of(input.substring(0, 1), input)); - } - } - } - - public static class ConcatGroupFn extends DoFn<Pair<String, Iterable<String>>, String> { - @Override - public void process(Pair<String, Iterable<String>> input, Emitter<String> emitter) { - StringBuilder sb = new StringBuilder(); - for (String str : input.second()) { - sb.append(str); - } - emitter.emit(sb.toString()); - } - } - - @Before - public void setUp() { - pipeline = new MRPipeline(UnionGbkIT.class, tmpDir.getDefaultConfiguration()); - } - - @After - public void tearDown() { - pipeline.done(); - } - - @Test - public void tableOfUnionGbk() throws Exception { - PCollection<String> words = pipeline.readTextFile( - tmpDir.copyResourceFileName("shakes.txt")); - PCollection<String> lorum = pipeline.readTextFile( - tmpDir.copyResourceFileName("maugham.txt")); - lorum.materialize(); - - @SuppressWarnings("unchecked") - PCollection<String> union = words.union(lorum); - - PGroupedTable<String, String> groupedByFirstLetter = - union.parallelDo("byFirstLetter", new FirstLetterKeyFn(), - Avros.tableOf(Avros.strings(), Avros.strings())) - .groupByKey(); - PCollection<String> concatted = groupedByFirstLetter - .parallelDo("concat", new ConcatGroupFn(), Avros.strings()); - - assertNotNull(concatted.materialize().iterator()); - } - - @Test - public void unionOfTablesGbk() throws Exception { - PCollection<String> words = pipeline.readTextFile( - tmpDir.copyResourceFileName("shakes.txt")); - PCollection<String> lorum = pipeline.readTextFile( - tmpDir.copyResourceFileName("maugham.txt")); - lorum.materialize(); - - PTable<String, String> wordsByFirstLetter = - words.parallelDo("byFirstLetter", new FirstLetterKeyFn(), - Avros.tableOf(Avros.strings(), Avros.strings())); - PTable<String, String> lorumByFirstLetter = - lorum.parallelDo("byFirstLetter", new FirstLetterKeyFn(), - Avros.tableOf(Avros.strings(), Avros.strings())); - - @SuppressWarnings("unchecked") - PTable<String, String> union = wordsByFirstLetter.union(lorumByFirstLetter); - - PGroupedTable<String, String> groupedByFirstLetter = union.groupByKey(); - - PCollection<String> concatted = groupedByFirstLetter.parallelDo("concat", - new ConcatGroupFn(), Avros.strings()); - - assertNotNull(concatted.materialize().iterator()); - } -} http://git-wip-us.apache.org/repos/asf/crunch/blob/154c8fbd/crunch/src/it/java/org/apache/crunch/UnionIT.java ---------------------------------------------------------------------- diff --git a/crunch/src/it/java/org/apache/crunch/UnionIT.java b/crunch/src/it/java/org/apache/crunch/UnionIT.java new file mode 100644 index 0000000..1c60a1b --- /dev/null +++ b/crunch/src/it/java/org/apache/crunch/UnionIT.java @@ -0,0 +1,136 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.crunch; + +import static org.hamcrest.Matchers.is; +import static org.junit.Assert.assertThat; + +import java.io.IOException; +import java.util.Map; + +import org.apache.crunch.fn.Aggregators; +import org.apache.crunch.fn.IdentityFn; +import org.apache.crunch.impl.mr.MRPipeline; +import org.apache.crunch.test.TemporaryPath; +import org.apache.crunch.test.TemporaryPaths; +import org.apache.crunch.test.Tests; +import org.apache.crunch.types.avro.Avros; +import org.junit.After; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; + +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ImmutableMultiset; + + +public class UnionIT { + + @Rule + public TemporaryPath tmpDir = TemporaryPaths.create(); + private MRPipeline pipeline; + private PCollection<String> words1; + private PCollection<String> words2; + + @Before + public void setUp() throws IOException { + pipeline = new MRPipeline(UnionIT.class, tmpDir.getDefaultConfiguration()); + words1 = pipeline.readTextFile(tmpDir.copyResourceFileName(Tests.resource(this, "src1.txt"))); + words2 = pipeline.readTextFile(tmpDir.copyResourceFileName(Tests.resource(this, "src2.txt"))); + } + + @After + public void tearDown() { + pipeline.done(); + } + + @Test + public void testUnion() throws Exception { + IdentityFn<String> identity = IdentityFn.getInstance(); + words1 = words1.parallelDo(identity, Avros.strings()); + words2 = words2.parallelDo(identity, Avros.strings()); + + PCollection<String> union = words1.union(words2); + + ImmutableMultiset<String> actual = ImmutableMultiset.copyOf(union.materialize()); + assertThat(actual.elementSet().size(), is(3)); + assertThat(actual.count("a1"), is(4)); + assertThat(actual.count("b2"), is(2)); + assertThat(actual.count("c3"), is(2)); + } + + @Test + public void testTableUnion() throws IOException { + PTable<String, String> words1ByFirstLetter = byFirstLetter(words1); + PTable<String, String> words2ByFirstLetter = byFirstLetter(words2); + + PTable<String, String> union = words1ByFirstLetter.union(words2ByFirstLetter); + + ImmutableMultiset<Pair<String, String>> actual = ImmutableMultiset.copyOf(union.materialize()); + + assertThat(actual.elementSet().size(), is(3)); + assertThat(actual.count(Pair.of("a", "1")), is(4)); + assertThat(actual.count(Pair.of("b", "2")), is(2)); + assertThat(actual.count(Pair.of("c", "3")), is(2)); + } + + @Test + public void testUnionThenGroupByKey() throws IOException { + PCollection<String> union = words1.union(words2); + + PGroupedTable<String, String> grouped = byFirstLetter(union).groupByKey(); + + Map<String, String> actual = grouped.combineValues(Aggregators.STRING_CONCAT("", true)) + .materializeToMap(); + + Map<String, String> expected = ImmutableMap.of("a", "1111", "b", "22", "c", "33"); + assertThat(actual, is(expected)); + } + + @Test + public void testTableUnionThenGroupByKey() throws IOException { + PTable<String, String> words1ByFirstLetter = byFirstLetter(words1); + PTable<String, String> words2ByFirstLetter = byFirstLetter(words2); + + PTable<String, String> union = words1ByFirstLetter.union(words2ByFirstLetter); + + PGroupedTable<String, String> grouped = union.groupByKey(); + + Map<String, String> actual = grouped.combineValues(Aggregators.STRING_CONCAT("", true)) + .materializeToMap(); + + Map<String, String> expected = ImmutableMap.of("a", "1111", "b", "22", "c", "33"); + assertThat(actual, is(expected)); + } + + + private static PTable<String, String> byFirstLetter(PCollection<String> values) { + return values.parallelDo("byFirstLetter", new FirstLetterKeyFn(), + Avros.tableOf(Avros.strings(), Avros.strings())); + } + + private static class FirstLetterKeyFn extends DoFn<String, Pair<String, String>> { + @Override + public void process(String input, Emitter<Pair<String, String>> emitter) { + if (input.length() > 1) { + emitter.emit(Pair.of(input.substring(0, 1), input.substring(1))); + } + } + } + +} http://git-wip-us.apache.org/repos/asf/crunch/blob/154c8fbd/crunch/src/it/java/org/apache/crunch/test/Tests.java ---------------------------------------------------------------------- diff --git a/crunch/src/it/java/org/apache/crunch/test/Tests.java b/crunch/src/it/java/org/apache/crunch/test/Tests.java index f2c7a86..4c979af 100644 --- a/crunch/src/it/java/org/apache/crunch/test/Tests.java +++ b/crunch/src/it/java/org/apache/crunch/test/Tests.java @@ -17,6 +17,8 @@ */ package org.apache.crunch.test; +import static com.google.common.base.Preconditions.checkNotNull; + import java.util.Collection; import org.apache.crunch.Pipeline; @@ -46,10 +48,24 @@ public final class Tests { * @throws IllegalArgumentException Thrown if the resource doesn't exist */ public static String pathTo(Object testCase, String resourceName) { + String qualifiedName = resource(testCase, resourceName); + return Resources.getResource(qualifiedName).getFile(); + } + + /** + * This doesn't check whether the resource exists! + * + * @param testCase + * @param resourceName + * @return The path to the resource (never null) + */ + public static String resource(Object testCase, String resourceName) { + checkNotNull(testCase); + checkNotNull(resourceName); + // Note: We append "Data" because otherwise Eclipse would complain about the // the case's class name clashing with the resource directory's name. - String path = testCase.getClass().getName().replaceAll("\\.", "/") + "Data/" + resourceName; - return Resources.getResource(path).getFile(); + return testCase.getClass().getName().replaceAll("\\.", "/") + "Data/" + resourceName; } /** http://git-wip-us.apache.org/repos/asf/crunch/blob/154c8fbd/crunch/src/it/resources/org/apache/crunch/UnionITData/src1.txt ---------------------------------------------------------------------- diff --git a/crunch/src/it/resources/org/apache/crunch/UnionITData/src1.txt b/crunch/src/it/resources/org/apache/crunch/UnionITData/src1.txt new file mode 100644 index 0000000..a92974b --- /dev/null +++ b/crunch/src/it/resources/org/apache/crunch/UnionITData/src1.txt @@ -0,0 +1,5 @@ +a1 +b2 +a1 +a1 +b2 http://git-wip-us.apache.org/repos/asf/crunch/blob/154c8fbd/crunch/src/it/resources/org/apache/crunch/UnionITData/src2.txt ---------------------------------------------------------------------- diff --git a/crunch/src/it/resources/org/apache/crunch/UnionITData/src2.txt b/crunch/src/it/resources/org/apache/crunch/UnionITData/src2.txt new file mode 100644 index 0000000..9363398 --- /dev/null +++ b/crunch/src/it/resources/org/apache/crunch/UnionITData/src2.txt @@ -0,0 +1,3 @@ +c3 +a1 +c3
