Updated Branches: refs/heads/master 3e513cfab -> 222dd76ac
CRUNCH-193: Working GenericArrayWritable impl Project: http://git-wip-us.apache.org/repos/asf/crunch/repo Commit: http://git-wip-us.apache.org/repos/asf/crunch/commit/222dd76a Tree: http://git-wip-us.apache.org/repos/asf/crunch/tree/222dd76a Diff: http://git-wip-us.apache.org/repos/asf/crunch/diff/222dd76a Branch: refs/heads/master Commit: 222dd76ac2be2cdc786feeb9e75f86713ff8c559 Parents: 3e513cf Author: Josh Wills <[email protected]> Authored: Mon Apr 8 21:05:39 2013 -0700 Committer: Josh Wills <[email protected]> Committed: Wed Apr 10 11:08:37 2013 -0700 ---------------------------------------------------------------------- .../it/java/org/apache/crunch/lib/CogroupIT.java | 159 +++++---------- .../src/it/java/org/apache/crunch/test/Tests.java | 42 ++++ .../org/apache/crunch/lib/CogroupITData/src1.txt | 4 + .../org/apache/crunch/lib/CogroupITData/src2.txt | 4 + .../types/writable/GenericArrayWritable.java | 38 ++-- .../types/writable/GenericArrayWritableTest.java | 70 +++++++ 6 files changed, 192 insertions(+), 125 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/crunch/blob/222dd76a/crunch/src/it/java/org/apache/crunch/lib/CogroupIT.java ---------------------------------------------------------------------- diff --git a/crunch/src/it/java/org/apache/crunch/lib/CogroupIT.java b/crunch/src/it/java/org/apache/crunch/lib/CogroupIT.java index af3329f..4b28da7 100644 --- a/crunch/src/it/java/org/apache/crunch/lib/CogroupIT.java +++ b/crunch/src/it/java/org/apache/crunch/lib/CogroupIT.java @@ -17,157 +17,96 @@ */ package org.apache.crunch.lib; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; +import static org.hamcrest.Matchers.is; +import static org.junit.Assert.assertThat; -import java.io.File; import java.io.IOException; -import java.nio.charset.Charset; import java.util.Collection; -import java.util.Collections; -import java.util.List; +import java.util.Map; import org.apache.crunch.DoFn; import org.apache.crunch.Emitter; -import org.apache.crunch.MapFn; import org.apache.crunch.PCollection; import org.apache.crunch.PTable; import org.apache.crunch.Pair; -import org.apache.crunch.Pipeline; -import org.apache.crunch.fn.Aggregators; -import org.apache.crunch.fn.MapKeysFn; -import org.apache.crunch.fn.MapValuesFn; import org.apache.crunch.impl.mr.MRPipeline; -import org.apache.crunch.io.From; -import org.apache.crunch.test.StringWrapper; -import org.apache.crunch.test.StringWrapper.StringToStringWrapperMapFn; import org.apache.crunch.test.TemporaryPath; import org.apache.crunch.test.TemporaryPaths; +import org.apache.crunch.test.Tests; import org.apache.crunch.types.PTableType; import org.apache.crunch.types.PTypeFamily; import org.apache.crunch.types.avro.AvroTypeFamily; -import org.apache.crunch.types.avro.Avros; import org.apache.crunch.types.writable.WritableTypeFamily; +import org.junit.After; +import org.junit.Before; import org.junit.Rule; import org.junit.Test; -import com.google.common.base.Splitter; import com.google.common.collect.ImmutableList; -import com.google.common.collect.Lists; -import com.google.common.io.Files; +import com.google.common.collect.ImmutableMap; + public class CogroupIT { @Rule public TemporaryPath tmpDir = TemporaryPaths.create(); + private MRPipeline pipeline; + private PCollection<String> lines1; + private PCollection<String> lines2; - private static class WordSplit extends DoFn<String, Pair<String, Long>> { - @Override - public void process(String input, Emitter<Pair<String, Long>> emitter) { - for (String word : Splitter.on(' ').split(input)) { - emitter.emit(Pair.of(word, 1L)); - } - } + + @Before + public void setUp() throws IOException { + pipeline = new MRPipeline(CogroupIT.class, tmpDir.getDefaultConfiguration()); + lines1 = pipeline.readTextFile(tmpDir.copyResourceFileName(Tests.resource(this, "src1.txt"))); + lines2 = pipeline.readTextFile(tmpDir.copyResourceFileName(Tests.resource(this, "src2.txt"))); } - public static PTable<String, Long> join(PCollection<String> w1, PCollection<String> w2, PTypeFamily ptf) { - PTableType<String, Long> ntt = ptf.tableOf(ptf.strings(), ptf.longs()); - PTable<String, Long> ws1 = w1.parallelDo("ws1", new WordSplit(), ntt); - PTable<String, Long> ws2 = w2.parallelDo("ws2", new WordSplit(), ntt); - PTable<String, Pair<Collection<Long>, Collection<Long>>> cg = Cogroup.cogroup(ws1, ws2); - PTable<String, Long> sums = cg.parallelDo("wc", - new MapValuesFn<String, Pair<Collection<Long>, Collection<Long>>, Long>() { - @Override - public Long map(Pair<Collection<Long>, Collection<Long>> v) { - long sum = 0L; - for (Long value : v.first()) { - sum += value; - } - for (Long value : v.second()) { - sum += value; - } - return sum; - } - }, ntt); - return sums.parallelDo("firstletters", new MapKeysFn<String, String, Long>() { - @Override - public String map(String k1) { - if (k1.length() > 0) { - return k1.substring(0, 1).toLowerCase(); - } else { - return ""; - } - } - }, ntt).groupByKey().combineValues(Aggregators.SUM_LONGS()); + @After + public void tearDown() { + pipeline.done(); } @Test - public void testWritableJoin() throws Exception { - run(new MRPipeline(CogroupIT.class, tmpDir.getDefaultConfiguration()), WritableTypeFamily.getInstance()); + public void testCogroupWritables() { + runCogroup(WritableTypeFamily.getInstance()); } @Test - public void testAvroJoin() throws Exception { - run(new MRPipeline(CogroupIT.class, tmpDir.getDefaultConfiguration()), AvroTypeFamily.getInstance()); + public void testCogroupAvro() { + runCogroup(AvroTypeFamily.getInstance()); } - public void run(Pipeline pipeline, PTypeFamily typeFamily) throws IOException { - String shakesInputPath = tmpDir.copyResourceFileName("shakes.txt"); - String maughamInputPath = tmpDir.copyResourceFileName("maugham.txt"); - File output = tmpDir.getFile("output"); + public void runCogroup(PTypeFamily ptf) { + PTableType<String, String> tt = ptf.tableOf(ptf.strings(), ptf.strings()); - PCollection<String> shakespeare = pipeline.read(From.textFile(shakesInputPath)); - PCollection<String> maugham = pipeline.read(From.textFile(maughamInputPath)); - pipeline.writeTextFile(join(shakespeare, maugham, typeFamily), output.getAbsolutePath()); - pipeline.done(); + PTable<String, String> kv1 = lines1.parallelDo("kv1", new KeyValueSplit(), tt); + PTable<String, String> kv2 = lines2.parallelDo("kv2", new KeyValueSplit(), tt); - File outputFile = new File(output, "part-r-00000"); - List<String> lines = Files.readLines(outputFile, Charset.defaultCharset()); - boolean passed = false; - for (String line : lines) { - if (line.equals("[j,705]")) { - passed = true; - break; - } - } - assertTrue(passed); + PTable<String, Pair<Collection<String>, Collection<String>>> cg = Cogroup.cogroup(kv1, kv2); + + Map<String, Pair<Collection<String>, Collection<String>>> actual = cg.materializeToMap(); + + Map<String, Pair<Collection<String>, Collection<String>>> expected = ImmutableMap.of( + "a", Pair.of(coll("1-1", "1-4"), coll()), + "b", Pair.of(coll("1-2"), coll("2-1")), + "c", Pair.of(coll("1-3"), coll("2-2", "2-3")), + "d", Pair.of(coll(), coll("2-4")) + ); + + assertThat(actual, is(expected)); } - - static class ConstantMapFn extends MapFn<StringWrapper, StringWrapper> { + + private static class KeyValueSplit extends DoFn<String, Pair<String, String>> { @Override - public StringWrapper map(StringWrapper input) { - return StringWrapper.wrap("key"); + public void process(String input, Emitter<Pair<String, String>> emitter) { + String[] fields = input.split(","); + emitter.emit(Pair.of(fields[0], fields[1])); } - } - - @Test - public void testCogroup_CheckObjectResultOnRichObjects() throws IOException { - Pipeline pipeline = new MRPipeline(CogroupIT.class, tmpDir.getDefaultConfiguration()); - PTable<StringWrapper, StringWrapper> tableA = pipeline.readTextFile(tmpDir.copyResourceFileName("set1.txt")) - .parallelDo(new StringToStringWrapperMapFn(), Avros.reflects(StringWrapper.class)) - .by(new ConstantMapFn(), Avros.reflects(StringWrapper.class)); - PTable<StringWrapper, StringWrapper> tableB = pipeline.readTextFile(tmpDir.copyResourceFileName("set2.txt")) - .parallelDo(new StringToStringWrapperMapFn(), Avros.reflects(StringWrapper.class)) - .by(new ConstantMapFn(), Avros.reflects(StringWrapper.class)); - - List<String> set1Values = Lists.newArrayList(); - List<String> set2Values = Lists.newArrayList(); - PTable<StringWrapper, Pair<Collection<StringWrapper>, Collection<StringWrapper>>> cogroup = Cogroup.cogroup(tableA, tableB); - for (Pair<StringWrapper, Pair<Collection<StringWrapper>, Collection<StringWrapper>>> entry : cogroup.materialize()) { - for (StringWrapper stringWrapper : entry.second().first()) { - set1Values.add(stringWrapper.getValue()); - } - for (StringWrapper stringWrapper : entry.second().second()) { - set2Values.add(stringWrapper.getValue()); - } - } - - Collections.sort(set1Values); - Collections.sort(set2Values); - - assertEquals(ImmutableList.of("a", "b", "c", "e"), set1Values); - assertEquals(ImmutableList.of("a", "c", "d"), set2Values); - + + private static Collection<String> coll(String... values) { + return ImmutableList.copyOf(values); } + } http://git-wip-us.apache.org/repos/asf/crunch/blob/222dd76a/crunch/src/it/java/org/apache/crunch/test/Tests.java ---------------------------------------------------------------------- diff --git a/crunch/src/it/java/org/apache/crunch/test/Tests.java b/crunch/src/it/java/org/apache/crunch/test/Tests.java index 4c979af..e381c1a 100644 --- a/crunch/src/it/java/org/apache/crunch/test/Tests.java +++ b/crunch/src/it/java/org/apache/crunch/test/Tests.java @@ -17,16 +17,21 @@ */ package org.apache.crunch.test; +import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.base.Preconditions.checkNotNull; +import java.io.IOException; import java.util.Collection; import org.apache.crunch.Pipeline; import org.apache.crunch.impl.mem.MemPipeline; import org.apache.crunch.impl.mr.MRPipeline; +import org.apache.hadoop.io.Writable; import org.junit.runners.Parameterized.Parameters; import com.google.common.collect.ImmutableList; +import com.google.common.io.ByteArrayDataOutput; +import com.google.common.io.ByteStreams; import com.google.common.io.Resources; @@ -79,4 +84,41 @@ public final class Tests { new Object[][] { { MemPipeline.getInstance() }, { new MRPipeline(testCase) } }); } + + /** + * Serialize the given Writable into a byte array. + * + * @param value The instance to serialize + * @return The serialized data + */ + public static byte[] serialize(Writable value) { + checkNotNull(value); + try { + ByteArrayDataOutput out = ByteStreams.newDataOutput(); + value.write(out); + return out.toByteArray(); + } catch (IOException e) { + throw new IllegalStateException("cannot serialize", e); + } + } + + /** + * Serialize the src Writable into a byte array, then deserialize it into dest. + * @param src The instance to serialize + * @param dest The instance to deserialize into + * @return dest, for convenience + */ + public static <T extends Writable> T roundtrip(Writable src, T dest) { + checkNotNull(src); + checkNotNull(dest); + checkArgument(src != dest, "src and dest may not be the same instance"); + + try { + byte[] data = serialize(src); + dest.readFields(ByteStreams.newDataInput(data)); + } catch (IOException e) { + throw new IllegalStateException("cannot deserialize", e); + } + return dest; + } } http://git-wip-us.apache.org/repos/asf/crunch/blob/222dd76a/crunch/src/it/resources/org/apache/crunch/lib/CogroupITData/src1.txt ---------------------------------------------------------------------- diff --git a/crunch/src/it/resources/org/apache/crunch/lib/CogroupITData/src1.txt b/crunch/src/it/resources/org/apache/crunch/lib/CogroupITData/src1.txt new file mode 100644 index 0000000..9f38eb9 --- /dev/null +++ b/crunch/src/it/resources/org/apache/crunch/lib/CogroupITData/src1.txt @@ -0,0 +1,4 @@ +a,1-1 +b,1-2 +c,1-3 +a,1-4 http://git-wip-us.apache.org/repos/asf/crunch/blob/222dd76a/crunch/src/it/resources/org/apache/crunch/lib/CogroupITData/src2.txt ---------------------------------------------------------------------- diff --git a/crunch/src/it/resources/org/apache/crunch/lib/CogroupITData/src2.txt b/crunch/src/it/resources/org/apache/crunch/lib/CogroupITData/src2.txt new file mode 100644 index 0000000..ed9524e --- /dev/null +++ b/crunch/src/it/resources/org/apache/crunch/lib/CogroupITData/src2.txt @@ -0,0 +1,4 @@ +b,2-1 +c,2-2 +c,2-3 +d,2-4 http://git-wip-us.apache.org/repos/asf/crunch/blob/222dd76a/crunch/src/main/java/org/apache/crunch/types/writable/GenericArrayWritable.java ---------------------------------------------------------------------- diff --git a/crunch/src/main/java/org/apache/crunch/types/writable/GenericArrayWritable.java b/crunch/src/main/java/org/apache/crunch/types/writable/GenericArrayWritable.java index 79c93be..8b54008 100644 --- a/crunch/src/main/java/org/apache/crunch/types/writable/GenericArrayWritable.java +++ b/crunch/src/main/java/org/apache/crunch/types/writable/GenericArrayWritable.java @@ -29,6 +29,12 @@ import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.WritableFactories; import org.apache.hadoop.io.WritableUtils; +/** + * A {@link Writable} for marshalling/unmarshalling Collections. Note that + * element order is <em>undefined</em>! + * + * @param <T> The value type + */ class GenericArrayWritable<T> implements Writable { private Writable[] values; private Class<? extends Writable> valueClass; @@ -58,7 +64,7 @@ class GenericArrayWritable<T> implements Writable { } String valueType = Text.readString(in); setValueType(valueType); - for (int i = 0; i < values.length; i++) { + for (int i = 0; i < values.length - nulls; i++) { Writable value = WritableFactories.newInstance(valueClass); value.readFields(in); // read a value values[i] = value; // store it in values @@ -80,21 +86,23 @@ class GenericArrayWritable<T> implements Writable { public void write(DataOutput out) throws IOException { WritableUtils.writeVInt(out, values.length); - int nulls = 0; - for (int i = 0; i < values.length; i++) { - if (values[i] == null) { - nulls++; - } - } - WritableUtils.writeVInt(out, nulls); - if (values.length - nulls > 0) { - if (valueClass == null) { - throw new IllegalStateException("Value class not set by constructor or read"); - } - Text.writeString(out, valueClass.getName()); + if (values.length > 0) { + int nulls = 0; for (int i = 0; i < values.length; i++) { - if (values[i] != null) { - values[i].write(out); + if (values[i] == null) { + nulls++; + } + } + WritableUtils.writeVInt(out, nulls); + if (values.length - nulls > 0) { + if (valueClass == null) { + throw new IllegalStateException("Value class not set by constructor or read"); + } + Text.writeString(out, valueClass.getName()); + for (int i = 0; i < values.length; i++) { + if (values[i] != null) { + values[i].write(out); + } } } } http://git-wip-us.apache.org/repos/asf/crunch/blob/222dd76a/crunch/src/test/java/org/apache/crunch/types/writable/GenericArrayWritableTest.java ---------------------------------------------------------------------- diff --git a/crunch/src/test/java/org/apache/crunch/types/writable/GenericArrayWritableTest.java b/crunch/src/test/java/org/apache/crunch/types/writable/GenericArrayWritableTest.java new file mode 100644 index 0000000..c807a90 --- /dev/null +++ b/crunch/src/test/java/org/apache/crunch/types/writable/GenericArrayWritableTest.java @@ -0,0 +1,70 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.crunch.types.writable; + +import static org.hamcrest.Matchers.hasItems; +import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.not; +import static org.hamcrest.Matchers.sameInstance; +import static org.junit.Assert.assertThat; + +import java.util.Arrays; + +import org.apache.crunch.test.Tests; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.Writable; +import org.junit.Test; + + +public class GenericArrayWritableTest { + + @Test + public void testEmpty() { + GenericArrayWritable<Text> src = new GenericArrayWritable<Text>(Text.class); + src.set(new Text[0]); + + GenericArrayWritable<Text> dest = Tests.roundtrip(src, new GenericArrayWritable<Text>()); + + assertThat(dest.get().length, is(0)); + } + + @Test + public void testNonEmpty() { + GenericArrayWritable<Text> src = new GenericArrayWritable<Text>(Text.class); + src.set(new Text[] { new Text("foo"), new Text("bar") }); + + GenericArrayWritable<Text> dest = Tests.roundtrip(src, new GenericArrayWritable<Text>()); + + assertThat(src.get(), not(sameInstance(dest.get()))); + assertThat(dest.get().length, is(2)); + assertThat(Arrays.asList(dest.get()), hasItems((Writable) new Text("foo"), new Text("bar"))); + } + + @Test + public void testNulls() { + GenericArrayWritable<Text> src = new GenericArrayWritable<Text>(Text.class); + src.set(new Text[] { new Text("a"), null, new Text("b") }); + + GenericArrayWritable<Text> dest = Tests.roundtrip(src, new GenericArrayWritable<Text>()); + + assertThat(src.get(), not(sameInstance(dest.get()))); + assertThat(dest.get().length, is(3)); + assertThat(Arrays.asList(dest.get()), hasItems((Writable) new Text("a"), new Text("b"), null)); + } + +}
