Updated Branches: refs/heads/master 78a3eb7d9 -> 0ba630bb1
CRUNCH-117: Allow custom subclasses of Tuple to work correctly with the DeepCopier logic Signed-off-by: Gabriel Reid <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/incubator-crunch/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-crunch/commit/0ba630bb Tree: http://git-wip-us.apache.org/repos/asf/incubator-crunch/tree/0ba630bb Diff: http://git-wip-us.apache.org/repos/asf/incubator-crunch/diff/0ba630bb Branch: refs/heads/master Commit: 0ba630bb10765c2e74283b6c23560f55f9ca4e74 Parents: 78a3eb7 Author: Josh Wills <[email protected]> Authored: Mon Nov 19 16:05:59 2012 -0800 Committer: Gabriel Reid <[email protected]> Committed: Wed Nov 21 22:15:01 2012 +0100 ---------------------------------------------------------------------- .../org/apache/crunch/CollectionPObjectIT.java | 10 -- .../org/apache/crunch/CollectionsLengthIT.java | 6 - .../org/apache/crunch/DeepCopyCustomTuplesIT.java | 79 +++++++++++++++ .../org/apache/crunch/FirstElementPObjectIT.java | 11 -- .../src/it/java/org/apache/crunch/PObjectsIT.java | 9 -- .../src/it/java/org/apache/crunch/PageRankIT.java | 1 - .../it/java/org/apache/crunch/lib/AggregateIT.java | 2 - .../java/org/apache/crunch/types/TupleFactory.java | 18 +++- .../org/apache/crunch/types/TupleFactoryTest.java | 4 +- 9 files changed, 96 insertions(+), 44 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/0ba630bb/crunch/src/it/java/org/apache/crunch/CollectionPObjectIT.java ---------------------------------------------------------------------- diff --git a/crunch/src/it/java/org/apache/crunch/CollectionPObjectIT.java b/crunch/src/it/java/org/apache/crunch/CollectionPObjectIT.java index 2157d24..7e0c75c 100644 --- a/crunch/src/it/java/org/apache/crunch/CollectionPObjectIT.java +++ b/crunch/src/it/java/org/apache/crunch/CollectionPObjectIT.java @@ -18,11 +18,8 @@ package org.apache.crunch; import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; import java.io.IOException; -import java.lang.Integer; -import java.lang.Iterable; import java.lang.String; import java.util.Collection; @@ -32,18 +29,11 @@ import org.apache.crunch.Pipeline; import org.apache.crunch.impl.mem.MemPipeline; import org.apache.crunch.impl.mr.MRPipeline; import org.apache.crunch.materialize.pobject.CollectionPObject; -import org.apache.crunch.materialize.pobject.PObjectImpl; import org.apache.crunch.test.TemporaryPath; import org.apache.crunch.test.TemporaryPaths; -import org.apache.crunch.types.PTypeFamily; -import org.apache.crunch.types.avro.AvroTypeFamily; -import org.apache.crunch.types.writable.WritableTypeFamily; import org.junit.Rule; import org.junit.Test; -import com.google.common.collect.ImmutableList; -import com.google.common.collect.Lists; - @SuppressWarnings("serial") public class CollectionPObjectIT { http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/0ba630bb/crunch/src/it/java/org/apache/crunch/CollectionsLengthIT.java ---------------------------------------------------------------------- diff --git a/crunch/src/it/java/org/apache/crunch/CollectionsLengthIT.java b/crunch/src/it/java/org/apache/crunch/CollectionsLengthIT.java index 8fb03ee..3a38b92 100644 --- a/crunch/src/it/java/org/apache/crunch/CollectionsLengthIT.java +++ b/crunch/src/it/java/org/apache/crunch/CollectionsLengthIT.java @@ -18,13 +18,10 @@ package org.apache.crunch; import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; import java.io.IOException; import java.lang.Long; -import java.util.Collection; -import org.apache.crunch.PObject; import org.apache.crunch.impl.mem.MemPipeline; import org.apache.crunch.impl.mr.MRPipeline; import org.apache.crunch.test.TemporaryPath; @@ -35,9 +32,6 @@ import org.apache.crunch.types.writable.WritableTypeFamily; import org.junit.Rule; import org.junit.Test; -import com.google.common.collect.ImmutableList; -import com.google.common.collect.Lists; - @SuppressWarnings("serial") public class CollectionsLengthIT { http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/0ba630bb/crunch/src/it/java/org/apache/crunch/DeepCopyCustomTuplesIT.java ---------------------------------------------------------------------- diff --git a/crunch/src/it/java/org/apache/crunch/DeepCopyCustomTuplesIT.java b/crunch/src/it/java/org/apache/crunch/DeepCopyCustomTuplesIT.java new file mode 100644 index 0000000..f1323ca --- /dev/null +++ b/crunch/src/it/java/org/apache/crunch/DeepCopyCustomTuplesIT.java @@ -0,0 +1,79 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.crunch; + +import static org.apache.crunch.types.avro.Avros.*; +import static org.junit.Assert.assertEquals; + +import org.apache.crunch.impl.mr.MRPipeline; +import org.apache.crunch.test.TemporaryPath; +import org.apache.crunch.test.TemporaryPaths; +import org.apache.crunch.types.PType; +import org.junit.Rule; +import org.junit.Test; + +import com.google.common.collect.Iterables; + +/** + * + */ +public class DeepCopyCustomTuplesIT { + @Rule + public TemporaryPath tmpDir = TemporaryPaths.create(); + + public static class PID extends Pair<Integer, String> { + public PID(Integer first, String second) { + super(first, second); + } + } + + private static PType<PID> pids = tuples(PID.class, ints(), strings()); + + @Test + public void testDeepCopyCustomTuple() throws Exception { + Pipeline p = new MRPipeline(DeepCopyCustomTuplesIT.class, tmpDir.getDefaultConfiguration()); + String shakesInputPath = tmpDir.copyResourceFileName("shakes.txt"); + PCollection<String> shakes = p.readTextFile(shakesInputPath); + Iterable<String> out = shakes + .parallelDo(new PreProcFn(), tableOf(ints(), pairs(ints(), pids))) + .groupByKey() + .parallelDo(new PostProcFn(), strings()) + .materialize(); + assertEquals(65, Iterables.size(out)); + p.done(); + } + + private static class PreProcFn extends MapFn<String, Pair<Integer, Pair<Integer, PID>>> { + private int counter = 0; + @Override + public Pair<Integer, Pair<Integer, PID>> map(String input) { + return Pair.of(counter++, Pair.of(counter++, new PID(input.length(), input))); + } + }; + + private static class PostProcFn extends DoFn<Pair<Integer, Iterable<Pair<Integer, PID>>>, String> { + @Override + public void process(Pair<Integer, Iterable<Pair<Integer, PID>>> input, Emitter<String> emitter) { + for (Pair<Integer, PID> p : input.second()) { + if (p.second().first() > 0 && p.second().first() < 10) { + emitter.emit(p.second().second()); + } + } + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/0ba630bb/crunch/src/it/java/org/apache/crunch/FirstElementPObjectIT.java ---------------------------------------------------------------------- diff --git a/crunch/src/it/java/org/apache/crunch/FirstElementPObjectIT.java b/crunch/src/it/java/org/apache/crunch/FirstElementPObjectIT.java index 3efcf30..d985e10 100644 --- a/crunch/src/it/java/org/apache/crunch/FirstElementPObjectIT.java +++ b/crunch/src/it/java/org/apache/crunch/FirstElementPObjectIT.java @@ -18,31 +18,20 @@ package org.apache.crunch; import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; import java.io.IOException; -import java.lang.Integer; -import java.lang.Iterable; import java.lang.String; -import java.util.Collection; import org.apache.crunch.PCollection; import org.apache.crunch.PObject; import org.apache.crunch.impl.mem.MemPipeline; import org.apache.crunch.impl.mr.MRPipeline; import org.apache.crunch.materialize.pobject.FirstElementPObject; -import org.apache.crunch.materialize.pobject.PObjectImpl; import org.apache.crunch.test.TemporaryPath; import org.apache.crunch.test.TemporaryPaths; -import org.apache.crunch.types.PTypeFamily; -import org.apache.crunch.types.avro.AvroTypeFamily; -import org.apache.crunch.types.writable.WritableTypeFamily; import org.junit.Rule; import org.junit.Test; -import com.google.common.collect.ImmutableList; -import com.google.common.collect.Lists; - @SuppressWarnings("serial") public class FirstElementPObjectIT { http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/0ba630bb/crunch/src/it/java/org/apache/crunch/PObjectsIT.java ---------------------------------------------------------------------- diff --git a/crunch/src/it/java/org/apache/crunch/PObjectsIT.java b/crunch/src/it/java/org/apache/crunch/PObjectsIT.java index a3810dc..6ee849f 100644 --- a/crunch/src/it/java/org/apache/crunch/PObjectsIT.java +++ b/crunch/src/it/java/org/apache/crunch/PObjectsIT.java @@ -18,31 +18,22 @@ package org.apache.crunch; import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; import java.io.IOException; import java.lang.Integer; import java.lang.Iterable; import java.lang.String; -import java.util.Collection; import java.util.Iterator; import org.apache.crunch.PCollection; -import org.apache.crunch.PObject; import org.apache.crunch.impl.mem.MemPipeline; import org.apache.crunch.impl.mr.MRPipeline; import org.apache.crunch.materialize.pobject.PObjectImpl; import org.apache.crunch.test.TemporaryPath; import org.apache.crunch.test.TemporaryPaths; -import org.apache.crunch.types.PTypeFamily; -import org.apache.crunch.types.avro.AvroTypeFamily; -import org.apache.crunch.types.writable.WritableTypeFamily; import org.junit.Rule; import org.junit.Test; -import com.google.common.collect.ImmutableList; -import com.google.common.collect.Lists; - @SuppressWarnings("serial") public class PObjectsIT { http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/0ba630bb/crunch/src/it/java/org/apache/crunch/PageRankIT.java ---------------------------------------------------------------------- diff --git a/crunch/src/it/java/org/apache/crunch/PageRankIT.java b/crunch/src/it/java/org/apache/crunch/PageRankIT.java index 5a555b3..6291ef8 100644 --- a/crunch/src/it/java/org/apache/crunch/PageRankIT.java +++ b/crunch/src/it/java/org/apache/crunch/PageRankIT.java @@ -22,7 +22,6 @@ import static org.junit.Assert.assertEquals; import java.util.Collection; import java.util.List; -import org.apache.crunch.PObject; import org.apache.crunch.impl.mem.MemPipeline; import org.apache.crunch.impl.mr.MRPipeline; import org.apache.crunch.lib.Aggregate; http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/0ba630bb/crunch/src/it/java/org/apache/crunch/lib/AggregateIT.java ---------------------------------------------------------------------- diff --git a/crunch/src/it/java/org/apache/crunch/lib/AggregateIT.java b/crunch/src/it/java/org/apache/crunch/lib/AggregateIT.java index 88596aa..56ee3ac 100644 --- a/crunch/src/it/java/org/apache/crunch/lib/AggregateIT.java +++ b/crunch/src/it/java/org/apache/crunch/lib/AggregateIT.java @@ -28,7 +28,6 @@ import java.util.Map; import org.apache.crunch.MapFn; import org.apache.crunch.PCollection; -import org.apache.crunch.PObject; import org.apache.crunch.PTable; import org.apache.crunch.Pair; import org.apache.crunch.Pipeline; @@ -48,7 +47,6 @@ import org.junit.Rule; import org.junit.Test; import com.google.common.collect.ImmutableList; -import com.google.common.collect.Iterables; import com.google.common.collect.Lists; public class AggregateIT { http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/0ba630bb/crunch/src/main/java/org/apache/crunch/types/TupleFactory.java ---------------------------------------------------------------------- diff --git a/crunch/src/main/java/org/apache/crunch/types/TupleFactory.java b/crunch/src/main/java/org/apache/crunch/types/TupleFactory.java index c547cd6..69fbc92 100644 --- a/crunch/src/main/java/org/apache/crunch/types/TupleFactory.java +++ b/crunch/src/main/java/org/apache/crunch/types/TupleFactory.java @@ -19,6 +19,7 @@ package org.apache.crunch.types; import java.io.Serializable; import java.lang.reflect.Constructor; +import java.util.Map; import org.apache.crunch.Pair; import org.apache.crunch.Tuple; @@ -27,6 +28,8 @@ import org.apache.crunch.Tuple4; import org.apache.crunch.TupleN; import org.apache.crunch.impl.mr.run.CrunchRuntimeException; +import com.google.common.collect.Maps; + public abstract class TupleFactory<T extends Tuple> implements Serializable { public void initialize() { @@ -34,9 +37,11 @@ public abstract class TupleFactory<T extends Tuple> implements Serializable { public abstract T makeTuple(Object... values); + + private static final Map<Class, TupleFactory> customTupleFactories = Maps.newHashMap(); + /** - * Get the {@link TupleFactory} for a given Tuple implementation. Only - * standard Tuple implementations are supported. + * Get the {@link TupleFactory} for a given Tuple implementation. * * @param tupleClass * The class for which the factory is to be retrieved @@ -51,6 +56,8 @@ public abstract class TupleFactory<T extends Tuple> implements Serializable { return (TupleFactory<T>) TUPLE4; } else if (tupleClass == TupleN.class) { return (TupleFactory<T>) TUPLEN; + } else if (customTupleFactories.containsKey(tupleClass)) { + return (TupleFactory<T>) customTupleFactories.get(tupleClass); } else { throw new IllegalArgumentException("Can't create TupleFactory for " + tupleClass); } @@ -85,7 +92,12 @@ public abstract class TupleFactory<T extends Tuple> implements Serializable { }; public static <T extends Tuple> TupleFactory<T> create(Class<T> clazz, Class... typeArgs) { - return new CustomTupleFactory<T>(clazz, typeArgs); + if (customTupleFactories.containsKey(clazz)) { + return (TupleFactory<T>) customTupleFactories.get(clazz); + } + TupleFactory<T> custom = new CustomTupleFactory<T>(clazz, typeArgs); + customTupleFactories.put(clazz, custom); + return custom; } private static class CustomTupleFactory<T extends Tuple> extends TupleFactory<T> { http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/0ba630bb/crunch/src/test/java/org/apache/crunch/types/TupleFactoryTest.java ---------------------------------------------------------------------- diff --git a/crunch/src/test/java/org/apache/crunch/types/TupleFactoryTest.java b/crunch/src/test/java/org/apache/crunch/types/TupleFactoryTest.java index 25b0371..0726be2 100644 --- a/crunch/src/test/java/org/apache/crunch/types/TupleFactoryTest.java +++ b/crunch/src/test/java/org/apache/crunch/types/TupleFactoryTest.java @@ -48,9 +48,9 @@ public class TupleFactoryTest { assertEquals(TupleFactory.TUPLEN, TupleFactory.getTupleFactory(TupleN.class)); } - @Test(expected = IllegalArgumentException.class) public void testGetTupleFactory_CustomTupleClass() { - TupleFactory.getTupleFactory(CustomTupleImplementation.class); + TupleFactory<CustomTupleImplementation> customTupleFactory = TupleFactory.create(CustomTupleImplementation.class); + assertEquals(customTupleFactory, TupleFactory.getTupleFactory(CustomTupleImplementation.class)); } private static class CustomTupleImplementation implements Tuple {
