Updated Branches: refs/heads/master 547d56fa8 -> 153617a44
Correct Avro deep copying on Pair and Collection. Project: http://git-wip-us.apache.org/repos/asf/incubator-crunch/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-crunch/commit/153617a4 Tree: http://git-wip-us.apache.org/repos/asf/incubator-crunch/tree/153617a4 Diff: http://git-wip-us.apache.org/repos/asf/incubator-crunch/diff/153617a4 Branch: refs/heads/master Commit: 153617a4444e439b6fe9dc06d3d4c5a3859307da Parents: 547d56f Author: Gabriel Reid <[email protected]> Authored: Thu Aug 9 22:53:55 2012 +0200 Committer: Gabriel Reid <[email protected]> Committed: Thu Aug 9 22:53:55 2012 +0200 ---------------------------------------------------------------------- .../apache/crunch/types/CollectionDeepCopier.java | 49 ++++++++++ .../java/org/apache/crunch/types/DeepCopier.java | 37 ++++++++ .../org/apache/crunch/types/TupleDeepCopier.java | 52 +++++++++++ .../java/org/apache/crunch/types/TupleFactory.java | 23 +++++ .../apache/crunch/types/avro/AvroDeepCopier.java | 13 ++- .../org/apache/crunch/types/avro/AvroType.java | 24 ++++-- .../java/org/apache/crunch/types/avro/Avros.java | 64 ++++++++------ .../crunch/types/CollectionDeepCopierTest.java | 31 +++++++ .../apache/crunch/types/TupleDeepCopierTest.java | 49 ++++++++++ .../org/apache/crunch/types/TupleFactoryTest.java | 69 +++++++++++++++ .../crunch/types/avro/AvroDeepCopierTest.java | 10 ++- .../org/apache/crunch/types/avro/AvroTypeTest.java | 50 +++++++++-- 12 files changed, 421 insertions(+), 50 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/153617a4/crunch/src/main/java/org/apache/crunch/types/CollectionDeepCopier.java ---------------------------------------------------------------------- diff --git a/crunch/src/main/java/org/apache/crunch/types/CollectionDeepCopier.java b/crunch/src/main/java/org/apache/crunch/types/CollectionDeepCopier.java new file mode 100644 index 0000000..5bdc715 --- /dev/null +++ b/crunch/src/main/java/org/apache/crunch/types/CollectionDeepCopier.java @@ -0,0 +1,49 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.crunch.types; + +import java.util.Collection; +import java.util.List; + +import org.apache.hadoop.thirdparty.guava.common.collect.Lists; + +/** + * Performs deep copies (based on underlying PType deep copying) of Collections. + * + * @param <T> + * The type of Tuple implementation being copied + */ +public class CollectionDeepCopier<T> implements DeepCopier<Collection<T>> { + + + private PType<T> elementType; + + public CollectionDeepCopier(PType<T> elementType) { + this.elementType = elementType; + } + + @Override + public Collection<T> deepCopy(Collection<T> source) { + List<T> copiedCollection = Lists.newArrayListWithCapacity(source.size()); + for (T value : source){ + copiedCollection.add(elementType.getDetachedValue(value)); + } + return copiedCollection; + } + +} http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/153617a4/crunch/src/main/java/org/apache/crunch/types/DeepCopier.java ---------------------------------------------------------------------- diff --git a/crunch/src/main/java/org/apache/crunch/types/DeepCopier.java b/crunch/src/main/java/org/apache/crunch/types/DeepCopier.java new file mode 100644 index 0000000..539b808 --- /dev/null +++ b/crunch/src/main/java/org/apache/crunch/types/DeepCopier.java @@ -0,0 +1,37 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.crunch.types; + +/** + * Performs deep copies of values. + * + * @param <T> + * The type of value that will be copied + */ +public interface DeepCopier<T> { + + /** + * Create a deep copy of a value. + * + * @param source + * The value to be copied + * @return The deep copy of the value + */ + T deepCopy(T source); + +} http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/153617a4/crunch/src/main/java/org/apache/crunch/types/TupleDeepCopier.java ---------------------------------------------------------------------- diff --git a/crunch/src/main/java/org/apache/crunch/types/TupleDeepCopier.java b/crunch/src/main/java/org/apache/crunch/types/TupleDeepCopier.java new file mode 100644 index 0000000..094b582 --- /dev/null +++ b/crunch/src/main/java/org/apache/crunch/types/TupleDeepCopier.java @@ -0,0 +1,52 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.crunch.types; + +import java.util.List; + +import org.apache.crunch.Tuple; + +/** + * Performs deep copies (based on underlying PType deep copying) of Tuple-based + * objects. + * + * @param <T> + * The type of Tuple implementation being copied + */ +public class TupleDeepCopier<T extends Tuple> implements DeepCopier<T> { + + private final TupleFactory<T> tupleFactory; + private final List<PType> elementTypes; + + public TupleDeepCopier(PType<T> ptype) { + tupleFactory = TupleFactory.getTupleFactory(ptype.getTypeClass()); + elementTypes = ptype.getSubTypes(); + } + + @Override + public T deepCopy(T source) { + Object[] deepCopyValues = new Object[source.size()]; + + for (int valueIndex = 0; valueIndex < elementTypes.size(); valueIndex++) { + PType elementType = elementTypes.get(valueIndex); + deepCopyValues[valueIndex] = elementType.getDetachedValue(source.get(valueIndex)); + } + + return tupleFactory.makeTuple(deepCopyValues); + } +} http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/153617a4/crunch/src/main/java/org/apache/crunch/types/TupleFactory.java ---------------------------------------------------------------------- diff --git a/crunch/src/main/java/org/apache/crunch/types/TupleFactory.java b/crunch/src/main/java/org/apache/crunch/types/TupleFactory.java index 16c3bcd..c547cd6 100644 --- a/crunch/src/main/java/org/apache/crunch/types/TupleFactory.java +++ b/crunch/src/main/java/org/apache/crunch/types/TupleFactory.java @@ -34,6 +34,28 @@ public abstract class TupleFactory<T extends Tuple> implements Serializable { public abstract T makeTuple(Object... values); + /** + * Get the {@link TupleFactory} for a given Tuple implementation. Only + * standard Tuple implementations are supported. + * + * @param tupleClass + * The class for which the factory is to be retrieved + * @return The appropriate TupleFactory + */ + public static <T extends Tuple> TupleFactory<T> getTupleFactory(Class<T> tupleClass) { + if (tupleClass == Pair.class) { + return (TupleFactory<T>) PAIR; + } else if (tupleClass == Tuple3.class) { + return (TupleFactory<T>) TUPLE3; + } else if (tupleClass == Tuple4.class) { + return (TupleFactory<T>) TUPLE4; + } else if (tupleClass == TupleN.class) { + return (TupleFactory<T>) TUPLEN; + } else { + throw new IllegalArgumentException("Can't create TupleFactory for " + tupleClass); + } + } + public static final TupleFactory<Pair> PAIR = new TupleFactory<Pair>() { @Override public Pair makeTuple(Object... values) { @@ -96,4 +118,5 @@ public abstract class TupleFactory<T extends Tuple> implements Serializable { } } } + } http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/153617a4/crunch/src/main/java/org/apache/crunch/types/avro/AvroDeepCopier.java ---------------------------------------------------------------------- diff --git a/crunch/src/main/java/org/apache/crunch/types/avro/AvroDeepCopier.java b/crunch/src/main/java/org/apache/crunch/types/avro/AvroDeepCopier.java index 078353a..ad5ba04 100644 --- a/crunch/src/main/java/org/apache/crunch/types/avro/AvroDeepCopier.java +++ b/crunch/src/main/java/org/apache/crunch/types/avro/AvroDeepCopier.java @@ -36,6 +36,7 @@ import org.apache.avro.reflect.ReflectDatumWriter; import org.apache.avro.specific.SpecificDatumReader; import org.apache.avro.specific.SpecificDatumWriter; import org.apache.crunch.impl.mr.run.CrunchRuntimeException; +import org.apache.crunch.types.DeepCopier; /** * Performs deep copies of Avro-serializable objects. @@ -45,7 +46,7 @@ import org.apache.crunch.impl.mr.run.CrunchRuntimeException; * running in its own JVM, but it may well be a problem in any other kind of * multi-threaded context. */ -public abstract class AvroDeepCopier<T> implements Serializable { +public abstract class AvroDeepCopier<T> implements DeepCopier<T>, Serializable { private BinaryEncoder binaryEncoder; private BinaryDecoder binaryDecoder; @@ -67,7 +68,7 @@ public abstract class AvroDeepCopier<T> implements Serializable { private Class<T> valueClass; public AvroSpecificDeepCopier(Class<T> valueClass, Schema schema) { - super(new SpecificDatumWriter<T>(schema), new SpecificDatumReader(schema)); + super(new SpecificDatumWriter<T>(schema), new SpecificDatumReader<T>(schema)); this.valueClass = valueClass; } @@ -113,6 +114,10 @@ public abstract class AvroDeepCopier<T> implements Serializable { } } + public static class AvroTupleDeepCopier { + + } + /** * Create a deep copy of an Avro value. * @@ -120,6 +125,7 @@ public abstract class AvroDeepCopier<T> implements Serializable { * The value to be copied * @return The deep copy of the value */ + @Override public T deepCopy(T source) { ByteArrayOutputStream byteOutStream = new ByteArrayOutputStream(); binaryEncoder = EncoderFactory.get().binaryEncoder(byteOutStream, binaryEncoder); @@ -127,7 +133,8 @@ public abstract class AvroDeepCopier<T> implements Serializable { try { datumWriter.write(source, binaryEncoder); binaryEncoder.flush(); - binaryDecoder = DecoderFactory.get().binaryDecoder(byteOutStream.toByteArray(), binaryDecoder); + binaryDecoder = DecoderFactory.get() + .binaryDecoder(byteOutStream.toByteArray(), binaryDecoder); datumReader.read(target, binaryDecoder); } catch (Exception e) { throw new CrunchRuntimeException("Error while deep copying avro value " + source, e); http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/153617a4/crunch/src/main/java/org/apache/crunch/types/avro/AvroType.java ---------------------------------------------------------------------- diff --git a/crunch/src/main/java/org/apache/crunch/types/avro/AvroType.java b/crunch/src/main/java/org/apache/crunch/types/avro/AvroType.java index b3ce576..8d7fbd5 100644 --- a/crunch/src/main/java/org/apache/crunch/types/avro/AvroType.java +++ b/crunch/src/main/java/org/apache/crunch/types/avro/AvroType.java @@ -17,6 +17,7 @@ */ package org.apache.crunch.types.avro; +import java.util.Collection; import java.util.List; import org.apache.avro.Schema; @@ -25,11 +26,15 @@ import org.apache.avro.specific.SpecificRecord; import org.apache.commons.lang.builder.HashCodeBuilder; import org.apache.crunch.MapFn; import org.apache.crunch.SourceTarget; +import org.apache.crunch.Tuple; import org.apache.crunch.fn.IdentityFn; import org.apache.crunch.io.avro.AvroFileSourceTarget; +import org.apache.crunch.types.CollectionDeepCopier; import org.apache.crunch.types.Converter; +import org.apache.crunch.types.DeepCopier; import org.apache.crunch.types.PType; import org.apache.crunch.types.PTypeFamily; +import org.apache.crunch.types.TupleDeepCopier; import org.apache.hadoop.fs.Path; import com.google.common.base.Preconditions; @@ -50,13 +55,14 @@ public class AvroType<T> implements PType<T> { private final MapFn baseInputMapFn; private final MapFn baseOutputMapFn; private final List<PType> subTypes; - private AvroDeepCopier<T> deepCopier; + private DeepCopier<T> deepCopier; public AvroType(Class<T> typeClass, Schema schema, PType... ptypes) { this(typeClass, schema, IdentityFn.getInstance(), IdentityFn.getInstance(), ptypes); } - public AvroType(Class<T> typeClass, Schema schema, MapFn inputMapFn, MapFn outputMapFn, PType... ptypes) { + public AvroType(Class<T> typeClass, Schema schema, MapFn inputMapFn, MapFn outputMapFn, + PType... ptypes) { this.typeClass = typeClass; this.schema = Preconditions.checkNotNull(schema); this.schemaString = schema.toString(); @@ -125,7 +131,8 @@ public class AvroType<T> implements PType<T> { return false; } - return !(typeClass.equals(GenericData.Record.class) || SpecificRecord.class.isAssignableFrom(typeClass)); + return !(typeClass.equals(GenericData.Record.class) || SpecificRecord.class + .isAssignableFrom(typeClass)); } public MapFn<Object, T> getInputMapFn() { @@ -146,9 +153,13 @@ public class AvroType<T> implements PType<T> { return new AvroFileSourceTarget<T>(path, this); } - private AvroDeepCopier<T> getDeepCopier() { + private DeepCopier<T> getDeepCopier() { if (deepCopier == null) { - if (isSpecific()) { + if (Tuple.class.isAssignableFrom(this.typeClass)) { + deepCopier = new TupleDeepCopier(this); + } else if (Collection.class.isAssignableFrom(this.typeClass)){ + deepCopier = new CollectionDeepCopier(this.subTypes.get(0)); + } else if (isSpecific()) { deepCopier = new AvroDeepCopier.AvroSpecificDeepCopier<T>(typeClass, getSchema()); } else if (isGeneric()) { deepCopier = (AvroDeepCopier<T>) new AvroDeepCopier.AvroGenericDeepCopier(getSchema()); @@ -160,7 +171,8 @@ public class AvroType<T> implements PType<T> { } public T getDetachedValue(T value) { - if (this.baseInputMapFn instanceof IdentityFn && !Avros.isPrimitive(this)) { + + if (!Avros.isPrimitive(this)) { return getDeepCopier().deepCopy(value); } return value; http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/153617a4/crunch/src/main/java/org/apache/crunch/types/avro/Avros.java ---------------------------------------------------------------------- diff --git a/crunch/src/main/java/org/apache/crunch/types/avro/Avros.java b/crunch/src/main/java/org/apache/crunch/types/avro/Avros.java index f8214a1..a6d7169 100644 --- a/crunch/src/main/java/org/apache/crunch/types/avro/Avros.java +++ b/crunch/src/main/java/org/apache/crunch/types/avro/Avros.java @@ -77,7 +77,8 @@ public class Avros { public static final String REFLECT_DATA_FACTORY_CLASS = "crunch.reflectdatafactory"; public static void configureReflectDataFactory(Configuration conf) { - conf.setClass(REFLECT_DATA_FACTORY_CLASS, REFLECT_DATA_FACTORY.getClass(), ReflectDataFactory.class); + conf.setClass(REFLECT_DATA_FACTORY_CLASS, REFLECT_DATA_FACTORY.getClass(), + ReflectDataFactory.class); } public static ReflectDataFactory getReflectDataFactory(Configuration conf) { @@ -109,8 +110,8 @@ public class Avros { } }; - private static final AvroType<String> strings = new AvroType<String>(String.class, Schema.create(Schema.Type.STRING), - UTF8_TO_STRING, STRING_TO_UTF8); + private static final AvroType<String> strings = new AvroType<String>(String.class, + Schema.create(Schema.Type.STRING), UTF8_TO_STRING, STRING_TO_UTF8); private static final AvroType<Void> nulls = create(Void.class, Schema.Type.NULL); private static final AvroType<Long> longs = create(Long.class, Schema.Type.LONG); private static final AvroType<Integer> ints = create(Integer.class, Schema.Type.INT); @@ -120,9 +121,10 @@ public class Avros { private static final AvroType<ByteBuffer> bytes = new AvroType<ByteBuffer>(ByteBuffer.class, Schema.create(Schema.Type.BYTES), BYTES_IN, IdentityFn.getInstance()); - private static final Map<Class<?>, PType<?>> PRIMITIVES = ImmutableMap.<Class<?>, PType<?>> builder() - .put(String.class, strings).put(Long.class, longs).put(Integer.class, ints).put(Float.class, floats) - .put(Double.class, doubles).put(Boolean.class, booleans).put(ByteBuffer.class, bytes).build(); + private static final Map<Class<?>, PType<?>> PRIMITIVES = ImmutableMap + .<Class<?>, PType<?>> builder().put(String.class, strings).put(Long.class, longs) + .put(Integer.class, ints).put(Float.class, floats).put(Double.class, doubles) + .put(Boolean.class, booleans).put(ByteBuffer.class, bytes).build(); private static final Map<Class<?>, AvroType<?>> EXTENSIONS = Maps.newHashMap(); @@ -206,8 +208,8 @@ public class Avros { public T map(ByteBuffer input) { T instance = ReflectionUtils.newInstance(writableClazz, getConfiguration()); try { - instance.readFields(new DataInputStream(new ByteArrayInputStream(input.array(), input.arrayOffset(), input - .limit()))); + instance.readFields(new DataInputStream(new ByteArrayInputStream(input.array(), input + .arrayOffset(), input.limit()))); } catch (IOException e) { LOG.error("Exception thrown reading instance of: " + writableClazz, e); } @@ -232,8 +234,8 @@ public class Avros { } public static final <T extends Writable> AvroType<T> writables(Class<T> clazz) { - return new AvroType<T>(clazz, Schema.create(Schema.Type.BYTES), new BytesToWritableMapFn<T>(clazz), - new WritableToBytesMapFn<T>()); + return new AvroType<T>(clazz, Schema.create(Schema.Type.BYTES), new BytesToWritableMapFn<T>( + clazz), new WritableToBytesMapFn<T>()); } private static class GenericDataArrayToCollection<T> extends MapFn<Object, Collection<T>> { @@ -277,7 +279,8 @@ public class Avros { } } - private static class CollectionToGenericDataArray extends MapFn<Collection<?>, GenericData.Array<?>> { + private static class CollectionToGenericDataArray extends + MapFn<Collection<?>, GenericData.Array<?>> { private final MapFn mapFn; private final String jsonSchema; @@ -319,8 +322,10 @@ public class Avros { public static final <T> AvroType<Collection<T>> collections(PType<T> ptype) { AvroType<T> avroType = (AvroType<T>) ptype; Schema collectionSchema = Schema.createArray(allowNulls(avroType.getSchema())); - GenericDataArrayToCollection<T> input = new GenericDataArrayToCollection<T>(avroType.getInputMapFn()); - CollectionToGenericDataArray output = new CollectionToGenericDataArray(collectionSchema, avroType.getOutputMapFn()); + GenericDataArrayToCollection<T> input = new GenericDataArrayToCollection<T>( + avroType.getInputMapFn()); + CollectionToGenericDataArray output = new CollectionToGenericDataArray(collectionSchema, + avroType.getOutputMapFn()); return new AvroType(Collection.class, collectionSchema, input, output, ptype); } @@ -520,23 +525,25 @@ public class Avros { return new AvroType(Pair.class, schema, input, output, p1, p2); } - public static final <V1, V2, V3> AvroType<Tuple3<V1, V2, V3>> triples(PType<V1> p1, PType<V2> p2, PType<V3> p3) { + public static final <V1, V2, V3> AvroType<Tuple3<V1, V2, V3>> triples(PType<V1> p1, PType<V2> p2, + PType<V3> p3) { Schema schema = createTupleSchema(p1, p2, p3); - return new AvroType(Tuple3.class, schema, new GenericRecordToTuple(TupleFactory.TUPLE3, p1, p2, p3), - new TupleToGenericRecord(schema, p1, p2, p3), p1, p2, p3); + return new AvroType(Tuple3.class, schema, new GenericRecordToTuple(TupleFactory.TUPLE3, p1, p2, + p3), new TupleToGenericRecord(schema, p1, p2, p3), p1, p2, p3); } - public static final <V1, V2, V3, V4> AvroType<Tuple4<V1, V2, V3, V4>> quads(PType<V1> p1, PType<V2> p2, PType<V3> p3, - PType<V4> p4) { + public static final <V1, V2, V3, V4> AvroType<Tuple4<V1, V2, V3, V4>> quads(PType<V1> p1, + PType<V2> p2, PType<V3> p3, PType<V4> p4) { Schema schema = createTupleSchema(p1, p2, p3, p4); - return new AvroType(Tuple4.class, schema, new GenericRecordToTuple(TupleFactory.TUPLE4, p1, p2, p3, p4), - new TupleToGenericRecord(schema, p1, p2, p3, p4), p1, p2, p3, p4); + return new AvroType(Tuple4.class, schema, new GenericRecordToTuple(TupleFactory.TUPLE4, p1, p2, + p3, p4), new TupleToGenericRecord(schema, p1, p2, p3, p4), p1, p2, p3, p4); } public static final AvroType<TupleN> tuples(PType... ptypes) { Schema schema = createTupleSchema(ptypes); - return new AvroType(TupleN.class, schema, new GenericRecordToTuple(TupleFactory.TUPLEN, ptypes), - new TupleToGenericRecord(schema, ptypes), ptypes); + return new AvroType(TupleN.class, schema, + new GenericRecordToTuple(TupleFactory.TUPLEN, ptypes), new TupleToGenericRecord(schema, + ptypes), ptypes); } public static <T extends Tuple> AvroType<T> tuples(Class<T> clazz, PType... ptypes) { @@ -546,8 +553,8 @@ public class Avros { typeArgs[i] = ptypes[i].getTypeClass(); } TupleFactory<T> factory = TupleFactory.create(clazz, typeArgs); - return new AvroType<T>(clazz, schema, new GenericRecordToTuple(factory, ptypes), new TupleToGenericRecord(schema, - ptypes), ptypes); + return new AvroType<T>(clazz, schema, new GenericRecordToTuple(factory, ptypes), + new TupleToGenericRecord(schema, ptypes), ptypes); } private static Schema createTupleSchema(PType<?>... ptypes) { @@ -564,11 +571,12 @@ public class Avros { return schema; } - public static final <S, T> AvroType<T> derived(Class<T> clazz, MapFn<S, T> inputFn, MapFn<T, S> outputFn, - PType<S> base) { + public static final <S, T> AvroType<T> derived(Class<T> clazz, MapFn<S, T> inputFn, + MapFn<T, S> outputFn, PType<S> base) { AvroType<S> abase = (AvroType<S>) base; - return new AvroType<T>(clazz, abase.getSchema(), new CompositeMapFn(abase.getInputMapFn(), inputFn), - new CompositeMapFn(outputFn, abase.getOutputMapFn()), base.getSubTypes().toArray(new PType[0])); + return new AvroType<T>(clazz, abase.getSchema(), new CompositeMapFn(abase.getInputMapFn(), + inputFn), new CompositeMapFn(outputFn, abase.getOutputMapFn()), base.getSubTypes().toArray( + new PType[0])); } public static <T> PType<T> jsons(Class<T> clazz) { http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/153617a4/crunch/src/test/java/org/apache/crunch/types/CollectionDeepCopierTest.java ---------------------------------------------------------------------- diff --git a/crunch/src/test/java/org/apache/crunch/types/CollectionDeepCopierTest.java b/crunch/src/test/java/org/apache/crunch/types/CollectionDeepCopierTest.java new file mode 100644 index 0000000..7e532e8 --- /dev/null +++ b/crunch/src/test/java/org/apache/crunch/types/CollectionDeepCopierTest.java @@ -0,0 +1,31 @@ +package org.apache.crunch.types; + +import static org.junit.Assert.*; + +import java.util.Collection; + +import org.apache.crunch.test.Person; +import org.apache.crunch.types.avro.Avros; +import org.junit.Test; + +import com.google.common.collect.Lists; + +public class CollectionDeepCopierTest { + + @Test + public void testDeepCopy() { + Person person = new Person(); + person.setAge(42); + person.setName("John Smith"); + person.setSiblingnames(Lists.<CharSequence> newArrayList()); + + Collection<Person> personCollection = Lists.newArrayList(person); + CollectionDeepCopier<Person> collectionDeepCopier = new CollectionDeepCopier<Person>(Avros.records(Person.class)); + + Collection<Person> deepCopyCollection = collectionDeepCopier.deepCopy(personCollection); + + assertEquals(personCollection, deepCopyCollection); + assertNotSame(personCollection.iterator().next(), deepCopyCollection.iterator().next()); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/153617a4/crunch/src/test/java/org/apache/crunch/types/TupleDeepCopierTest.java ---------------------------------------------------------------------- diff --git a/crunch/src/test/java/org/apache/crunch/types/TupleDeepCopierTest.java b/crunch/src/test/java/org/apache/crunch/types/TupleDeepCopierTest.java new file mode 100644 index 0000000..8a3a12f --- /dev/null +++ b/crunch/src/test/java/org/apache/crunch/types/TupleDeepCopierTest.java @@ -0,0 +1,49 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.crunch.types; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotSame; + +import org.apache.crunch.Pair; +import org.apache.crunch.test.Person; +import org.apache.crunch.types.avro.Avros; +import org.junit.Test; + +import com.google.common.collect.Lists; + +public class TupleDeepCopierTest { + + @Test + public void testDeepCopy_Pair() { + Person person = new Person(); + person.setName("John Doe"); + person.setAge(42); + person.setSiblingnames(Lists.<CharSequence> newArrayList()); + + Pair<Integer, Person> inputPair = Pair.of(1, person); + DeepCopier<Pair<Integer, Person>> deepCopier = new TupleDeepCopier<Pair<Integer, Person>>( + Avros.pairs(Avros.ints(), Avros.records(Person.class))); + + Pair<Integer, Person> deepCopyPair = deepCopier.deepCopy(inputPair); + + assertEquals(inputPair, deepCopyPair); + assertNotSame(inputPair.second(), deepCopyPair.second()); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/153617a4/crunch/src/test/java/org/apache/crunch/types/TupleFactoryTest.java ---------------------------------------------------------------------- diff --git a/crunch/src/test/java/org/apache/crunch/types/TupleFactoryTest.java b/crunch/src/test/java/org/apache/crunch/types/TupleFactoryTest.java new file mode 100644 index 0000000..25b0371 --- /dev/null +++ b/crunch/src/test/java/org/apache/crunch/types/TupleFactoryTest.java @@ -0,0 +1,69 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.crunch.types; + +import static org.junit.Assert.assertEquals; + +import org.apache.crunch.Pair; +import org.apache.crunch.Tuple; +import org.apache.crunch.Tuple3; +import org.apache.crunch.Tuple4; +import org.apache.crunch.TupleN; +import org.junit.Test; + +public class TupleFactoryTest { + + @Test + public void testGetTupleFactory_Pair() { + assertEquals(TupleFactory.PAIR, TupleFactory.getTupleFactory(Pair.class)); + } + + @Test + public void testGetTupleFactory_Tuple3() { + assertEquals(TupleFactory.TUPLE3, TupleFactory.getTupleFactory(Tuple3.class)); + } + + @Test + public void testGetTupleFactory_Tuple4() { + assertEquals(TupleFactory.TUPLE4, TupleFactory.getTupleFactory(Tuple4.class)); + } + + @Test + public void testGetTupleFactory_TupleN() { + assertEquals(TupleFactory.TUPLEN, TupleFactory.getTupleFactory(TupleN.class)); + } + + @Test(expected = IllegalArgumentException.class) + public void testGetTupleFactory_CustomTupleClass() { + TupleFactory.getTupleFactory(CustomTupleImplementation.class); + } + + private static class CustomTupleImplementation implements Tuple { + + @Override + public Object get(int index) { + return null; + } + + @Override + public int size() { + return 0; + } + + } +} http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/153617a4/crunch/src/test/java/org/apache/crunch/types/avro/AvroDeepCopierTest.java ---------------------------------------------------------------------- diff --git a/crunch/src/test/java/org/apache/crunch/types/avro/AvroDeepCopierTest.java b/crunch/src/test/java/org/apache/crunch/types/avro/AvroDeepCopierTest.java index d966dfe..fa1b4c4 100644 --- a/crunch/src/test/java/org/apache/crunch/types/avro/AvroDeepCopierTest.java +++ b/crunch/src/test/java/org/apache/crunch/types/avro/AvroDeepCopierTest.java @@ -36,7 +36,8 @@ public class AvroDeepCopierTest { person.setAge(42); person.setSiblingnames(Lists.<CharSequence> newArrayList()); - Person deepCopyPerson = new AvroSpecificDeepCopier<Person>(Person.class, Person.SCHEMA$).deepCopy(person); + Person deepCopyPerson = new AvroSpecificDeepCopier<Person>(Person.class, Person.SCHEMA$) + .deepCopy(person); assertEquals(person, deepCopyPerson); assertNotSame(person, deepCopyPerson); @@ -49,7 +50,8 @@ public class AvroDeepCopierTest { record.put("age", 42); record.put("siblingnames", Lists.newArrayList()); - Record deepCopyRecord = new AvroDeepCopier.AvroGenericDeepCopier(Person.SCHEMA$).deepCopy(record); + Record deepCopyRecord = new AvroDeepCopier.AvroGenericDeepCopier(Person.SCHEMA$) + .deepCopy(record); assertEquals(record, deepCopyRecord); assertNotSame(record, deepCopyRecord); @@ -62,8 +64,8 @@ public class AvroDeepCopierTest { person.setAge(42); person.setSiblingnames(Lists.<CharSequence> newArrayList()); - Person deepCopyPerson = new AvroDeepCopier.AvroReflectDeepCopier<Person>(Person.class, Person.SCHEMA$) - .deepCopy(person); + Person deepCopyPerson = new AvroDeepCopier.AvroReflectDeepCopier<Person>(Person.class, + Person.SCHEMA$).deepCopy(person); assertEquals(person, deepCopyPerson); assertNotSame(person, deepCopyPerson); http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/153617a4/crunch/src/test/java/org/apache/crunch/types/avro/AvroTypeTest.java ---------------------------------------------------------------------- diff --git a/crunch/src/test/java/org/apache/crunch/types/avro/AvroTypeTest.java b/crunch/src/test/java/org/apache/crunch/types/avro/AvroTypeTest.java index 2a80a5e..486bd1a 100644 --- a/crunch/src/test/java/org/apache/crunch/types/avro/AvroTypeTest.java +++ b/crunch/src/test/java/org/apache/crunch/types/avro/AvroTypeTest.java @@ -23,8 +23,12 @@ import static org.junit.Assert.assertNotSame; import static org.junit.Assert.assertSame; import static org.junit.Assert.assertTrue; +import java.util.Collection; +import java.util.List; + import org.apache.avro.generic.GenericData; import org.apache.avro.generic.GenericData.Record; +import org.apache.crunch.Pair; import org.apache.crunch.test.Person; import org.apache.crunch.test.StringWrapper; import org.junit.Test; @@ -152,15 +156,19 @@ public class AvroTypeTest { assertEquals(record, detachedRecord); assertNotSame(record, detachedRecord); } - - @Test - public void testGetDetachedValue_SpecificAvroType() { - AvroType<Person> specificType = Avros.records(Person.class); + + private Person createPerson(){ Person person = new Person(); person.setName("name value"); person.setAge(42); person.setSiblingnames(Lists.<CharSequence> newArrayList()); + return person; + } + @Test + public void testGetDetachedValue_SpecificAvroType() { + AvroType<Person> specificType = Avros.records(Person.class); + Person person = createPerson(); Person detachedPerson = specificType.getDetachedValue(person); assertEquals(person, detachedPerson); assertNotSame(person, detachedPerson); @@ -169,14 +177,38 @@ public class AvroTypeTest { @Test public void testGetDetachedValue_ReflectAvroType() { AvroType<Person> reflectType = Avros.reflects(Person.class); - Person person = new Person(); - person.setName("name value"); - person.setAge(42); - person.setSiblingnames(Lists.<CharSequence> newArrayList()); - + Person person = createPerson(); Person detachedPerson = reflectType.getDetachedValue(person); assertEquals(person, detachedPerson); assertNotSame(person, detachedPerson); } + @Test + public void testGetDetachedValue_Pair() { + Person person = createPerson(); + AvroType<Pair<Integer, Person>> pairType = Avros.pairs(Avros.ints(), + Avros.records(Person.class)); + + Pair<Integer, Person> inputPair = Pair.of(1, person); + Pair<Integer, Person> detachedPair = pairType.getDetachedValue(inputPair); + + assertEquals(inputPair, detachedPair); + assertNotSame(inputPair.second(), detachedPair.second()); + } + + @Test + public void testGetDetachedValue_Collection(){ + Person person = createPerson(); + List<Person> personList = Lists.newArrayList(person); + + AvroType<Collection<Person>> collectionType = Avros.collections(Avros.records(Person.class)); + + Collection<Person> detachedCollection = collectionType.getDetachedValue(personList); + + assertEquals(personList, detachedCollection); + Person detachedPerson = detachedCollection.iterator().next(); + + assertNotSame(person, detachedPerson); + } + }
