Updated Branches: refs/heads/master e487447b0 -> 2932e9e4a
CRUNCH-45 - Fix deep copying on Writables Correct broken getDetachedValue for Writable tuples, collections, and maps. Also refactor Writable deepcopying to use MapFns that are already present. Project: http://git-wip-us.apache.org/repos/asf/incubator-crunch/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-crunch/commit/2932e9e4 Tree: http://git-wip-us.apache.org/repos/asf/incubator-crunch/tree/2932e9e4 Diff: http://git-wip-us.apache.org/repos/asf/incubator-crunch/diff/2932e9e4 Branch: refs/heads/master Commit: 2932e9e4a053414e961b77e419a8b1d07ee2daa2 Parents: e487447 Author: Gabriel Reid <[email protected]> Authored: Wed Aug 15 09:30:32 2012 +0200 Committer: Gabriel Reid <[email protected]> Committed: Thu Aug 16 06:56:37 2012 +0200 ---------------------------------------------------------------------- .../java/org/apache/crunch/types/DeepCopier.java | 13 ++- .../org/apache/crunch/types/TupleDeepCopier.java | 8 +- .../apache/crunch/types/avro/AvroDeepCopier.java | 74 +++++++++-- .../apache/crunch/types/avro/AvroTableType.java | 3 +- .../org/apache/crunch/types/avro/AvroType.java | 38 +----- .../java/org/apache/crunch/types/avro/Avros.java | 95 ++++++++------- .../crunch/types/writable/WritableDeepCopier.java | 60 +++++++++ .../apache/crunch/types/writable/WritableType.java | 16 +-- .../apache/crunch/types/writable/Writables.java | 36 +----- .../org/apache/crunch/types/PTypeUtilsTest.java | 2 +- .../apache/crunch/types/TupleDeepCopierTest.java | 4 +- .../crunch/types/avro/AvroDeepCopierTest.java | 1 + .../org/apache/crunch/types/avro/AvroTypeTest.java | 14 ++- .../org/apache/crunch/types/avro/AvrosTest.java | 5 +- .../types/writable/WritableDeepCopierTest.java | 46 +++++++ .../crunch/types/writable/WritableTypeTest.java | 53 +++++++- .../crunch/types/writable/WritablesTest.java | 7 - 17 files changed, 316 insertions(+), 159 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/2932e9e4/crunch/src/main/java/org/apache/crunch/types/DeepCopier.java ---------------------------------------------------------------------- diff --git a/crunch/src/main/java/org/apache/crunch/types/DeepCopier.java b/crunch/src/main/java/org/apache/crunch/types/DeepCopier.java index 539b808..a96e7bf 100644 --- a/crunch/src/main/java/org/apache/crunch/types/DeepCopier.java +++ b/crunch/src/main/java/org/apache/crunch/types/DeepCopier.java @@ -17,13 +17,15 @@ */ package org.apache.crunch.types; +import java.io.Serializable; + /** * Performs deep copies of values. * * @param <T> * The type of value that will be copied */ -public interface DeepCopier<T> { +public interface DeepCopier<T> extends Serializable { /** * Create a deep copy of a value. @@ -33,5 +35,14 @@ public interface DeepCopier<T> { * @return The deep copy of the value */ T deepCopy(T source); + + static class NoOpDeepCopier<V> implements DeepCopier<V> { + @Override + public V deepCopy(V source) { + return source; + } + + } + } http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/2932e9e4/crunch/src/main/java/org/apache/crunch/types/TupleDeepCopier.java ---------------------------------------------------------------------- diff --git a/crunch/src/main/java/org/apache/crunch/types/TupleDeepCopier.java b/crunch/src/main/java/org/apache/crunch/types/TupleDeepCopier.java index 094b582..4f473a0 100644 --- a/crunch/src/main/java/org/apache/crunch/types/TupleDeepCopier.java +++ b/crunch/src/main/java/org/apache/crunch/types/TupleDeepCopier.java @@ -21,6 +21,8 @@ import java.util.List; import org.apache.crunch.Tuple; +import com.google.common.collect.Lists; + /** * Performs deep copies (based on underlying PType deep copying) of Tuple-based * objects. @@ -33,9 +35,9 @@ public class TupleDeepCopier<T extends Tuple> implements DeepCopier<T> { private final TupleFactory<T> tupleFactory; private final List<PType> elementTypes; - public TupleDeepCopier(PType<T> ptype) { - tupleFactory = TupleFactory.getTupleFactory(ptype.getTypeClass()); - elementTypes = ptype.getSubTypes(); + public TupleDeepCopier(Class<T> tupleClass, PType...elementTypes) { + tupleFactory = TupleFactory.getTupleFactory(tupleClass); + this.elementTypes = Lists.newArrayList(elementTypes); } @Override http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/2932e9e4/crunch/src/main/java/org/apache/crunch/types/avro/AvroDeepCopier.java ---------------------------------------------------------------------- diff --git a/crunch/src/main/java/org/apache/crunch/types/avro/AvroDeepCopier.java b/crunch/src/main/java/org/apache/crunch/types/avro/AvroDeepCopier.java index ad5ba04..fe4fe1a 100644 --- a/crunch/src/main/java/org/apache/crunch/types/avro/AvroDeepCopier.java +++ b/crunch/src/main/java/org/apache/crunch/types/avro/AvroDeepCopier.java @@ -48,18 +48,31 @@ import org.apache.crunch.types.DeepCopier; */ public abstract class AvroDeepCopier<T> implements DeepCopier<T>, Serializable { + private String jsonSchema; + private transient Schema schema; private BinaryEncoder binaryEncoder; private BinaryDecoder binaryDecoder; - protected DatumWriter<T> datumWriter; - protected DatumReader<T> datumReader; - protected AvroDeepCopier(DatumWriter<T> datumWriter, DatumReader<T> datumReader) { - this.datumWriter = datumWriter; - this.datumReader = datumReader; + private transient DatumWriter<T> datumWriter; + private transient DatumReader<T> datumReader; + + public AvroDeepCopier(Schema schema) { + this.jsonSchema = schema.toString(); + } + + protected Schema getSchema() { + if (schema == null) { + schema = new Schema.Parser().parse(jsonSchema); + } + return schema; } protected abstract T createCopyTarget(); + protected abstract DatumWriter<T> createDatumWriter(); + + protected abstract DatumReader<T> createDatumReader(); + /** * Deep copier for Avro specific data objects. */ @@ -68,7 +81,7 @@ public abstract class AvroDeepCopier<T> implements DeepCopier<T>, Serializable { private Class<T> valueClass; public AvroSpecificDeepCopier(Class<T> valueClass, Schema schema) { - super(new SpecificDatumWriter<T>(schema), new SpecificDatumReader<T>(schema)); + super(schema); this.valueClass = valueClass; } @@ -77,6 +90,16 @@ public abstract class AvroDeepCopier<T> implements DeepCopier<T>, Serializable { return createNewInstance(valueClass); } + @Override + protected DatumWriter<T> createDatumWriter() { + return new SpecificDatumWriter<T>(getSchema()); + } + + @Override + protected DatumReader<T> createDatumReader() { + return new SpecificDatumReader<T>(getSchema()); + } + } /** @@ -84,16 +107,25 @@ public abstract class AvroDeepCopier<T> implements DeepCopier<T>, Serializable { */ public static class AvroGenericDeepCopier extends AvroDeepCopier<Record> { - private Schema schema; + private transient Schema schema; public AvroGenericDeepCopier(Schema schema) { - super(new GenericDatumWriter<Record>(schema), new GenericDatumReader<Record>(schema)); - this.schema = schema; + super(schema); } @Override protected Record createCopyTarget() { - return new GenericData.Record(schema); + return new GenericData.Record(getSchema()); + } + + @Override + protected DatumReader<Record> createDatumReader() { + return new GenericDatumReader<Record>(getSchema()); + } + + @Override + protected DatumWriter<Record> createDatumWriter() { + return new GenericDatumWriter<Record>(getSchema()); } } @@ -101,10 +133,11 @@ public abstract class AvroDeepCopier<T> implements DeepCopier<T>, Serializable { * Deep copier for Avro reflect data objects. */ public static class AvroReflectDeepCopier<T> extends AvroDeepCopier<T> { + private Class<T> valueClass; public AvroReflectDeepCopier(Class<T> valueClass, Schema schema) { - super(new ReflectDatumWriter<T>(schema), new ReflectDatumReader<T>(schema)); + super(schema); this.valueClass = valueClass; } @@ -112,6 +145,16 @@ public abstract class AvroDeepCopier<T> implements DeepCopier<T>, Serializable { protected T createCopyTarget() { return createNewInstance(valueClass); } + + @Override + protected DatumReader<T> createDatumReader() { + return new ReflectDatumReader<T>(getSchema()); + } + + @Override + protected DatumWriter<T> createDatumWriter() { + return new ReflectDatumWriter<T>(getSchema()); + } } public static class AvroTupleDeepCopier { @@ -127,14 +170,19 @@ public abstract class AvroDeepCopier<T> implements DeepCopier<T>, Serializable { */ @Override public T deepCopy(T source) { + if (datumReader == null) { + datumReader = createDatumReader(); + } + if (datumWriter == null) { + datumWriter = createDatumWriter(); + } ByteArrayOutputStream byteOutStream = new ByteArrayOutputStream(); binaryEncoder = EncoderFactory.get().binaryEncoder(byteOutStream, binaryEncoder); T target = createCopyTarget(); try { datumWriter.write(source, binaryEncoder); binaryEncoder.flush(); - binaryDecoder = DecoderFactory.get() - .binaryDecoder(byteOutStream.toByteArray(), binaryDecoder); + binaryDecoder = DecoderFactory.get().binaryDecoder(byteOutStream.toByteArray(), binaryDecoder); datumReader.read(target, binaryDecoder); } catch (Exception e) { throw new CrunchRuntimeException("Error while deep copying avro value " + source, e); http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/2932e9e4/crunch/src/main/java/org/apache/crunch/types/avro/AvroTableType.java ---------------------------------------------------------------------- diff --git a/crunch/src/main/java/org/apache/crunch/types/avro/AvroTableType.java b/crunch/src/main/java/org/apache/crunch/types/avro/AvroTableType.java index eb26bf1..bd4b14c 100644 --- a/crunch/src/main/java/org/apache/crunch/types/avro/AvroTableType.java +++ b/crunch/src/main/java/org/apache/crunch/types/avro/AvroTableType.java @@ -25,6 +25,7 @@ import org.apache.crunch.lib.PTables; import org.apache.crunch.types.PGroupedTableType; import org.apache.crunch.types.PTableType; import org.apache.crunch.types.PType; +import org.apache.crunch.types.TupleDeepCopier; import org.apache.hadoop.conf.Configuration; /** @@ -121,7 +122,7 @@ public class AvroTableType<K, V> extends AvroType<Pair<K, V>> implements PTableT public AvroTableType(AvroType<K> keyType, AvroType<V> valueType, Class<Pair<K, V>> pairClass) { super(pairClass, org.apache.avro.mapred.Pair.getPairSchema(keyType.getSchema(), valueType.getSchema()), new IndexedRecordToPair(keyType.getInputMapFn(), valueType.getInputMapFn()), new PairToAvroPair(keyType, - valueType), keyType, valueType); + valueType), new TupleDeepCopier(Pair.class, keyType, valueType), keyType, valueType); this.keyType = keyType; this.valueType = valueType; } http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/2932e9e4/crunch/src/main/java/org/apache/crunch/types/avro/AvroType.java ---------------------------------------------------------------------- diff --git a/crunch/src/main/java/org/apache/crunch/types/avro/AvroType.java b/crunch/src/main/java/org/apache/crunch/types/avro/AvroType.java index 82c4c91..6e35f5b 100644 --- a/crunch/src/main/java/org/apache/crunch/types/avro/AvroType.java +++ b/crunch/src/main/java/org/apache/crunch/types/avro/AvroType.java @@ -17,9 +17,7 @@ */ package org.apache.crunch.types.avro; -import java.util.Collection; import java.util.List; -import java.util.Map; import org.apache.avro.Schema; import org.apache.avro.generic.GenericData; @@ -27,16 +25,12 @@ import org.apache.avro.specific.SpecificRecord; import org.apache.commons.lang.builder.HashCodeBuilder; import org.apache.crunch.MapFn; import org.apache.crunch.SourceTarget; -import org.apache.crunch.Tuple; import org.apache.crunch.fn.IdentityFn; import org.apache.crunch.io.avro.AvroFileSourceTarget; -import org.apache.crunch.types.CollectionDeepCopier; import org.apache.crunch.types.Converter; import org.apache.crunch.types.DeepCopier; -import org.apache.crunch.types.MapDeepCopier; import org.apache.crunch.types.PType; import org.apache.crunch.types.PTypeFamily; -import org.apache.crunch.types.TupleDeepCopier; import org.apache.hadoop.fs.Path; import com.google.common.base.Preconditions; @@ -59,17 +53,18 @@ public class AvroType<T> implements PType<T> { private final List<PType> subTypes; private DeepCopier<T> deepCopier; - public AvroType(Class<T> typeClass, Schema schema, PType... ptypes) { - this(typeClass, schema, IdentityFn.getInstance(), IdentityFn.getInstance(), ptypes); + public AvroType(Class<T> typeClass, Schema schema, DeepCopier<T> deepCopier, PType... ptypes) { + this(typeClass, schema, IdentityFn.getInstance(), IdentityFn.getInstance(), deepCopier, ptypes); } - public AvroType(Class<T> typeClass, Schema schema, MapFn inputMapFn, MapFn outputMapFn, + public AvroType(Class<T> typeClass, Schema schema, MapFn inputMapFn, MapFn outputMapFn, DeepCopier<T> deepCopier, PType... ptypes) { this.typeClass = typeClass; this.schema = Preconditions.checkNotNull(schema); this.schemaString = schema.toString(); this.baseInputMapFn = inputMapFn; this.baseOutputMapFn = outputMapFn; + this.deepCopier = deepCopier; this.subTypes = ImmutableList.<PType> builder().add(ptypes).build(); } @@ -155,31 +150,8 @@ public class AvroType<T> implements PType<T> { return new AvroFileSourceTarget<T>(path, this); } - private DeepCopier<T> getDeepCopier() { - if (deepCopier == null) { - if (Tuple.class.isAssignableFrom(this.typeClass)) { - deepCopier = new TupleDeepCopier(this); - } else if (Map.class.isAssignableFrom(this.typeClass)){ - deepCopier = new MapDeepCopier(this.subTypes.get(0)); - } else if (Collection.class.isAssignableFrom(this.typeClass)){ - deepCopier = new CollectionDeepCopier(this.subTypes.get(0)); - } else if (isSpecific()) { - deepCopier = new AvroDeepCopier.AvroSpecificDeepCopier<T>(typeClass, getSchema()); - } else if (isGeneric()) { - deepCopier = (AvroDeepCopier<T>) new AvroDeepCopier.AvroGenericDeepCopier(getSchema()); - } else { - deepCopier = new AvroDeepCopier.AvroReflectDeepCopier<T>(typeClass, getSchema()); - } - } - return deepCopier; - } - public T getDetachedValue(T value) { - - if (!Avros.isPrimitive(this)) { - return getDeepCopier().deepCopy(value); - } - return value; + return deepCopier.deepCopy(value); } @Override http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/2932e9e4/crunch/src/main/java/org/apache/crunch/types/avro/Avros.java ---------------------------------------------------------------------- diff --git a/crunch/src/main/java/org/apache/crunch/types/avro/Avros.java b/crunch/src/main/java/org/apache/crunch/types/avro/Avros.java index 00a297c..038f805 100644 --- a/crunch/src/main/java/org/apache/crunch/types/avro/Avros.java +++ b/crunch/src/main/java/org/apache/crunch/types/avro/Avros.java @@ -44,9 +44,14 @@ import org.apache.crunch.Tuple4; import org.apache.crunch.TupleN; import org.apache.crunch.fn.CompositeMapFn; import org.apache.crunch.fn.IdentityFn; +import org.apache.crunch.types.CollectionDeepCopier; +import org.apache.crunch.types.DeepCopier; +import org.apache.crunch.types.MapDeepCopier; import org.apache.crunch.types.PTableType; import org.apache.crunch.types.PType; +import org.apache.crunch.types.TupleDeepCopier; import org.apache.crunch.types.TupleFactory; +import org.apache.crunch.types.writable.WritableDeepCopier; import org.apache.crunch.util.PTypes; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.Writable; @@ -77,8 +82,7 @@ public class Avros { public static final String REFLECT_DATA_FACTORY_CLASS = "crunch.reflectdatafactory"; public static void configureReflectDataFactory(Configuration conf) { - conf.setClass(REFLECT_DATA_FACTORY_CLASS, REFLECT_DATA_FACTORY.getClass(), - ReflectDataFactory.class); + conf.setClass(REFLECT_DATA_FACTORY_CLASS, REFLECT_DATA_FACTORY.getClass(), ReflectDataFactory.class); } public static ReflectDataFactory getReflectDataFactory(Configuration conf) { @@ -110,8 +114,8 @@ public class Avros { } }; - private static final AvroType<String> strings = new AvroType<String>(String.class, - Schema.create(Schema.Type.STRING), UTF8_TO_STRING, STRING_TO_UTF8); + private static final AvroType<String> strings = new AvroType<String>(String.class, Schema.create(Schema.Type.STRING), + UTF8_TO_STRING, STRING_TO_UTF8, new DeepCopier.NoOpDeepCopier<String>()); private static final AvroType<Void> nulls = create(Void.class, Schema.Type.NULL); private static final AvroType<Long> longs = create(Long.class, Schema.Type.LONG); private static final AvroType<Integer> ints = create(Integer.class, Schema.Type.INT); @@ -119,12 +123,11 @@ public class Avros { private static final AvroType<Double> doubles = create(Double.class, Schema.Type.DOUBLE); private static final AvroType<Boolean> booleans = create(Boolean.class, Schema.Type.BOOLEAN); private static final AvroType<ByteBuffer> bytes = new AvroType<ByteBuffer>(ByteBuffer.class, - Schema.create(Schema.Type.BYTES), BYTES_IN, IdentityFn.getInstance()); + Schema.create(Schema.Type.BYTES), BYTES_IN, IdentityFn.getInstance(), new DeepCopier.NoOpDeepCopier<ByteBuffer>()); - private static final Map<Class<?>, PType<?>> PRIMITIVES = ImmutableMap - .<Class<?>, PType<?>> builder().put(String.class, strings).put(Long.class, longs) - .put(Integer.class, ints).put(Float.class, floats).put(Double.class, doubles) - .put(Boolean.class, booleans).put(ByteBuffer.class, bytes).build(); + private static final Map<Class<?>, PType<?>> PRIMITIVES = ImmutableMap.<Class<?>, PType<?>> builder() + .put(String.class, strings).put(Long.class, longs).put(Integer.class, ints).put(Float.class, floats) + .put(Double.class, doubles).put(Boolean.class, booleans).put(ByteBuffer.class, bytes).build(); private static final Map<Class<?>, AvroType<?>> EXTENSIONS = Maps.newHashMap(); @@ -141,7 +144,7 @@ public class Avros { } private static <T> AvroType<T> create(Class<T> clazz, Schema.Type schemaType) { - return new AvroType<T>(clazz, Schema.create(schemaType)); + return new AvroType<T>(clazz, Schema.create(schemaType), new DeepCopier.NoOpDeepCopier<T>()); } public static final AvroType<Void> nulls() { @@ -184,7 +187,8 @@ public class Avros { } public static final AvroType<GenericData.Record> generics(Schema schema) { - return new AvroType<GenericData.Record>(GenericData.Record.class, schema); + return new AvroType<GenericData.Record>(GenericData.Record.class, schema, new AvroDeepCopier.AvroGenericDeepCopier( + schema)); } public static final <T> AvroType<T> containers(Class<T> clazz) { @@ -192,7 +196,8 @@ public class Avros { } public static final <T> AvroType<T> reflects(Class<T> clazz) { - return new AvroType<T>(clazz, REFLECT_DATA_FACTORY.getReflectData().getSchema(clazz)); + Schema schema = REFLECT_DATA_FACTORY.getReflectData().getSchema(clazz); + return new AvroType<T>(clazz, schema, new AvroDeepCopier.AvroReflectDeepCopier<T>(clazz, schema)); } private static class BytesToWritableMapFn<T extends Writable> extends MapFn<ByteBuffer, T> { @@ -208,8 +213,8 @@ public class Avros { public T map(ByteBuffer input) { T instance = ReflectionUtils.newInstance(writableClazz, getConfiguration()); try { - instance.readFields(new DataInputStream(new ByteArrayInputStream(input.array(), input - .arrayOffset(), input.limit()))); + instance.readFields(new DataInputStream(new ByteArrayInputStream(input.array(), input.arrayOffset(), input + .limit()))); } catch (IOException e) { LOG.error("Exception thrown reading instance of: " + writableClazz, e); } @@ -234,8 +239,8 @@ public class Avros { } public static final <T extends Writable> AvroType<T> writables(Class<T> clazz) { - return new AvroType<T>(clazz, Schema.create(Schema.Type.BYTES), new BytesToWritableMapFn<T>( - clazz), new WritableToBytesMapFn<T>()); + return new AvroType<T>(clazz, Schema.create(Schema.Type.BYTES), new BytesToWritableMapFn<T>(clazz), + new WritableToBytesMapFn<T>(), new WritableDeepCopier<T>(clazz)); } private static class GenericDataArrayToCollection<T> extends MapFn<Object, Collection<T>> { @@ -279,8 +284,7 @@ public class Avros { } } - private static class CollectionToGenericDataArray extends - MapFn<Collection<?>, GenericData.Array<?>> { + private static class CollectionToGenericDataArray extends MapFn<Collection<?>, GenericData.Array<?>> { private final MapFn mapFn; private final String jsonSchema; @@ -322,11 +326,9 @@ public class Avros { public static final <T> AvroType<Collection<T>> collections(PType<T> ptype) { AvroType<T> avroType = (AvroType<T>) ptype; Schema collectionSchema = Schema.createArray(allowNulls(avroType.getSchema())); - GenericDataArrayToCollection<T> input = new GenericDataArrayToCollection<T>( - avroType.getInputMapFn()); - CollectionToGenericDataArray output = new CollectionToGenericDataArray(collectionSchema, - avroType.getOutputMapFn()); - return new AvroType(Collection.class, collectionSchema, input, output, ptype); + GenericDataArrayToCollection<T> input = new GenericDataArrayToCollection<T>(avroType.getInputMapFn()); + CollectionToGenericDataArray output = new CollectionToGenericDataArray(collectionSchema, avroType.getOutputMapFn()); + return new AvroType(Collection.class, collectionSchema, input, output, new CollectionDeepCopier<T>(ptype), ptype); } private static class AvroMapToMap<T> extends MapFn<Map<CharSequence, Object>, Map<String, T>> { @@ -398,7 +400,7 @@ public class Avros { Schema mapSchema = Schema.createMap(allowNulls(avroType.getSchema())); AvroMapToMap<T> inputFn = new AvroMapToMap<T>(avroType.getInputMapFn()); MapToAvroMap<T> outputFn = new MapToAvroMap<T>(avroType.getOutputMapFn()); - return new AvroType(Map.class, mapSchema, inputFn, outputFn, ptype); + return new AvroType(Map.class, mapSchema, inputFn, outputFn, new MapDeepCopier<T>(ptype), ptype); } private static class GenericRecordToTuple extends MapFn<GenericRecord, Tuple> { @@ -459,7 +461,7 @@ public class Avros { private final String jsonSchema; private final boolean isReflect; private transient Schema schema; - + public TupleToGenericRecord(Schema schema, PType<?>... ptypes) { this.fns = Lists.newArrayList(); this.avroTypes = Lists.newArrayList(); @@ -497,13 +499,13 @@ public class Avros { fn.setContext(getContext()); } } - - private GenericRecord createRecord(){ + + private GenericRecord createRecord() { if (isReflect) { return new ReflectGenericRecord(schema); } else { return new GenericData.Record(schema); - } + } } @Override @@ -525,28 +527,27 @@ public class Avros { Schema schema = createTupleSchema(p1, p2); GenericRecordToTuple input = new GenericRecordToTuple(TupleFactory.PAIR, p1, p2); TupleToGenericRecord output = new TupleToGenericRecord(schema, p1, p2); - return new AvroType(Pair.class, schema, input, output, p1, p2); + return new AvroType(Pair.class, schema, input, output, new TupleDeepCopier(Pair.class, p1, p2), p1, p2); } - public static final <V1, V2, V3> AvroType<Tuple3<V1, V2, V3>> triples(PType<V1> p1, PType<V2> p2, - PType<V3> p3) { + public static final <V1, V2, V3> AvroType<Tuple3<V1, V2, V3>> triples(PType<V1> p1, PType<V2> p2, PType<V3> p3) { Schema schema = createTupleSchema(p1, p2, p3); - return new AvroType(Tuple3.class, schema, new GenericRecordToTuple(TupleFactory.TUPLE3, p1, p2, - p3), new TupleToGenericRecord(schema, p1, p2, p3), p1, p2, p3); + return new AvroType(Tuple3.class, schema, new GenericRecordToTuple(TupleFactory.TUPLE3, p1, p2, p3), + new TupleToGenericRecord(schema, p1, p2, p3), new TupleDeepCopier(Tuple3.class, p1, p2, p3), p1, p2, p3); } - public static final <V1, V2, V3, V4> AvroType<Tuple4<V1, V2, V3, V4>> quads(PType<V1> p1, - PType<V2> p2, PType<V3> p3, PType<V4> p4) { + public static final <V1, V2, V3, V4> AvroType<Tuple4<V1, V2, V3, V4>> quads(PType<V1> p1, PType<V2> p2, PType<V3> p3, + PType<V4> p4) { Schema schema = createTupleSchema(p1, p2, p3, p4); - return new AvroType(Tuple4.class, schema, new GenericRecordToTuple(TupleFactory.TUPLE4, p1, p2, - p3, p4), new TupleToGenericRecord(schema, p1, p2, p3, p4), p1, p2, p3, p4); + return new AvroType(Tuple4.class, schema, new GenericRecordToTuple(TupleFactory.TUPLE4, p1, p2, p3, p4), + new TupleToGenericRecord(schema, p1, p2, p3, p4), new TupleDeepCopier(Tuple4.class, p1, p2, p3, p4), p1, p2, + p3, p4); } public static final AvroType<TupleN> tuples(PType... ptypes) { Schema schema = createTupleSchema(ptypes); - return new AvroType(TupleN.class, schema, - new GenericRecordToTuple(TupleFactory.TUPLEN, ptypes), new TupleToGenericRecord(schema, - ptypes), ptypes); + return new AvroType(TupleN.class, schema, new GenericRecordToTuple(TupleFactory.TUPLEN, ptypes), + new TupleToGenericRecord(schema, ptypes), new TupleDeepCopier(TupleN.class, ptypes), ptypes); } public static <T extends Tuple> AvroType<T> tuples(Class<T> clazz, PType... ptypes) { @@ -556,8 +557,8 @@ public class Avros { typeArgs[i] = ptypes[i].getTypeClass(); } TupleFactory<T> factory = TupleFactory.create(clazz, typeArgs); - return new AvroType<T>(clazz, schema, new GenericRecordToTuple(factory, ptypes), - new TupleToGenericRecord(schema, ptypes), ptypes); + return new AvroType<T>(clazz, schema, new GenericRecordToTuple(factory, ptypes), new TupleToGenericRecord(schema, + ptypes), new TupleDeepCopier(clazz, ptypes), ptypes); } private static Schema createTupleSchema(PType<?>... ptypes) { @@ -574,12 +575,12 @@ public class Avros { return schema; } - public static final <S, T> AvroType<T> derived(Class<T> clazz, MapFn<S, T> inputFn, - MapFn<T, S> outputFn, PType<S> base) { + public static final <S, T> AvroType<T> derived(Class<T> clazz, MapFn<S, T> inputFn, MapFn<T, S> outputFn, + PType<S> base) { AvroType<S> abase = (AvroType<S>) base; - return new AvroType<T>(clazz, abase.getSchema(), new CompositeMapFn(abase.getInputMapFn(), - inputFn), new CompositeMapFn(outputFn, abase.getOutputMapFn()), base.getSubTypes().toArray( - new PType[0])); + return new AvroType<T>(clazz, abase.getSchema(), new CompositeMapFn(abase.getInputMapFn(), inputFn), + new CompositeMapFn(outputFn, abase.getOutputMapFn()), new DeepCopier.NoOpDeepCopier<T>(), base.getSubTypes() + .toArray(new PType[0])); } public static <T> PType<T> jsons(Class<T> clazz) { http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/2932e9e4/crunch/src/main/java/org/apache/crunch/types/writable/WritableDeepCopier.java ---------------------------------------------------------------------- diff --git a/crunch/src/main/java/org/apache/crunch/types/writable/WritableDeepCopier.java b/crunch/src/main/java/org/apache/crunch/types/writable/WritableDeepCopier.java new file mode 100644 index 0000000..6469208 --- /dev/null +++ b/crunch/src/main/java/org/apache/crunch/types/writable/WritableDeepCopier.java @@ -0,0 +1,60 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.crunch.types.writable; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.DataInput; +import java.io.DataInputStream; +import java.io.DataOutputStream; + +import org.apache.crunch.impl.mr.run.CrunchRuntimeException; +import org.apache.crunch.types.DeepCopier; +import org.apache.hadoop.io.Writable; + + +/** + * Performs deep copies of Writable values. + * @param <T> The type of Writable that can be copied + */ +public class WritableDeepCopier<T extends Writable> implements DeepCopier<T>{ + + private Class<T> writableClass; + + public WritableDeepCopier(Class<T> writableClass){ + this.writableClass = writableClass; + } + + @Override + public T deepCopy(T source) { + ByteArrayOutputStream byteOutStream = new ByteArrayOutputStream(); + DataOutputStream dataOut = new DataOutputStream(byteOutStream); + T copiedValue = null; + try { + source.write(dataOut); + dataOut.flush(); + ByteArrayInputStream byteInStream = new ByteArrayInputStream(byteOutStream.toByteArray()); + DataInput dataInput = new DataInputStream(byteInStream); + copiedValue = writableClass.newInstance(); + copiedValue.readFields(dataInput); + } catch (Exception e) { + throw new CrunchRuntimeException("Error while deep copying " + source, e); + } + return copiedValue; + } +} http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/2932e9e4/crunch/src/main/java/org/apache/crunch/types/writable/WritableType.java ---------------------------------------------------------------------- diff --git a/crunch/src/main/java/org/apache/crunch/types/writable/WritableType.java b/crunch/src/main/java/org/apache/crunch/types/writable/WritableType.java index 45502a3..23c95ea 100644 --- a/crunch/src/main/java/org/apache/crunch/types/writable/WritableType.java +++ b/crunch/src/main/java/org/apache/crunch/types/writable/WritableType.java @@ -22,9 +22,9 @@ import java.util.List; import org.apache.commons.lang.builder.HashCodeBuilder; import org.apache.crunch.MapFn; import org.apache.crunch.SourceTarget; -import org.apache.crunch.fn.IdentityFn; import org.apache.crunch.io.seq.SeqFileSourceTarget; import org.apache.crunch.types.Converter; +import org.apache.crunch.types.DeepCopier; import org.apache.crunch.types.PType; import org.apache.crunch.types.PTypeFamily; import org.apache.hadoop.fs.Path; @@ -39,6 +39,7 @@ public class WritableType<T, W extends Writable> implements PType<T> { private final Converter converter; private final MapFn<W, T> inputFn; private final MapFn<T, W> outputFn; + private final DeepCopier<W> deepCopier; private final List<PType> subTypes; WritableType(Class<T> typeClass, Class<W> writableClass, MapFn<W, T> inputDoFn, MapFn<T, W> outputDoFn, @@ -48,6 +49,7 @@ public class WritableType<T, W extends Writable> implements PType<T> { this.inputFn = inputDoFn; this.outputFn = outputDoFn; this.converter = new WritableValueConverter(writableClass); + this.deepCopier = new WritableDeepCopier<W>(writableClass); this.subTypes = ImmutableList.<PType> builder().add(subTypes).build(); } @@ -99,17 +101,11 @@ public class WritableType<T, W extends Writable> implements PType<T> { return (typeClass.equals(wt.typeClass) && writableClass.equals(wt.writableClass) && subTypes.equals(wt.subTypes)); } - // Unchecked warnings are suppressed because we know that W and T are the same - // type (due to the IdentityFn being used) - @SuppressWarnings("unchecked") @Override public T getDetachedValue(T value) { - if (this.inputFn.getClass().equals(IdentityFn.class)) { - W writableValue = (W) value; - return (T) Writables.deepCopy(writableValue, this.writableClass); - } else { - return value; - } + W writableValue = outputFn.map(value); + W deepCopy = this.deepCopier.deepCopy(writableValue); + return inputFn.map(deepCopy); } @Override http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/2932e9e4/crunch/src/main/java/org/apache/crunch/types/writable/Writables.java ---------------------------------------------------------------------- diff --git a/crunch/src/main/java/org/apache/crunch/types/writable/Writables.java b/crunch/src/main/java/org/apache/crunch/types/writable/Writables.java index f4906b7..23bc7f5 100644 --- a/crunch/src/main/java/org/apache/crunch/types/writable/Writables.java +++ b/crunch/src/main/java/org/apache/crunch/types/writable/Writables.java @@ -17,11 +17,6 @@ */ package org.apache.crunch.types.writable; -import java.io.ByteArrayInputStream; -import java.io.ByteArrayOutputStream; -import java.io.DataInput; -import java.io.DataInputStream; -import java.io.DataOutputStream; import java.nio.ByteBuffer; import java.util.Collection; import java.util.List; @@ -35,8 +30,11 @@ import org.apache.crunch.Tuple4; import org.apache.crunch.TupleN; import org.apache.crunch.fn.CompositeMapFn; import org.apache.crunch.fn.IdentityFn; -import org.apache.crunch.impl.mr.run.CrunchRuntimeException; +import org.apache.crunch.types.CollectionDeepCopier; +import org.apache.crunch.types.DeepCopier; +import org.apache.crunch.types.MapDeepCopier; import org.apache.crunch.types.PType; +import org.apache.crunch.types.TupleDeepCopier; import org.apache.crunch.types.TupleFactory; import org.apache.crunch.util.PTypes; import org.apache.hadoop.conf.Configuration; @@ -582,31 +580,7 @@ public class Writables { return PTypes.jsonString(clazz, WritableTypeFamily.getInstance()); } - /** - * Perform a deep copy of a writable value. - * - * @param value - * The value to be copied - * @param writableClass - * The Writable class of the value to be copied - * @return A fully detached deep copy of the input value - */ - public static <T extends Writable> T deepCopy(T value, Class<T> writableClass) { - ByteArrayOutputStream byteOutStream = new ByteArrayOutputStream(); - DataOutputStream dataOut = new DataOutputStream(byteOutStream); - T copiedValue = null; - try { - value.write(dataOut); - dataOut.flush(); - ByteArrayInputStream byteInStream = new ByteArrayInputStream(byteOutStream.toByteArray()); - DataInput dataInput = new DataInputStream(byteInStream); - copiedValue = writableClass.newInstance(); - copiedValue.readFields(dataInput); - } catch (Exception e) { - throw new CrunchRuntimeException("Error while deep copying " + value, e); - } - return copiedValue; - } + // Not instantiable private Writables() { http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/2932e9e4/crunch/src/test/java/org/apache/crunch/types/PTypeUtilsTest.java ---------------------------------------------------------------------- diff --git a/crunch/src/test/java/org/apache/crunch/types/PTypeUtilsTest.java b/crunch/src/test/java/org/apache/crunch/types/PTypeUtilsTest.java index f9217d5..e6fd90c 100644 --- a/crunch/src/test/java/org/apache/crunch/types/PTypeUtilsTest.java +++ b/crunch/src/test/java/org/apache/crunch/types/PTypeUtilsTest.java @@ -77,7 +77,7 @@ public class PTypeUtilsTest { @Test public void testAvroRegistered() { - AvroType<Utf8> at = new AvroType<Utf8>(Utf8.class, Schema.create(Schema.Type.STRING)); + AvroType<Utf8> at = new AvroType<Utf8>(Utf8.class, Schema.create(Schema.Type.STRING), new DeepCopier.NoOpDeepCopier<Utf8>()); Avros.register(Utf8.class, at); assertEquals(at, Avros.records(Utf8.class)); } http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/2932e9e4/crunch/src/test/java/org/apache/crunch/types/TupleDeepCopierTest.java ---------------------------------------------------------------------- diff --git a/crunch/src/test/java/org/apache/crunch/types/TupleDeepCopierTest.java b/crunch/src/test/java/org/apache/crunch/types/TupleDeepCopierTest.java index 8a3a12f..017a813 100644 --- a/crunch/src/test/java/org/apache/crunch/types/TupleDeepCopierTest.java +++ b/crunch/src/test/java/org/apache/crunch/types/TupleDeepCopierTest.java @@ -37,8 +37,8 @@ public class TupleDeepCopierTest { person.setSiblingnames(Lists.<CharSequence> newArrayList()); Pair<Integer, Person> inputPair = Pair.of(1, person); - DeepCopier<Pair<Integer, Person>> deepCopier = new TupleDeepCopier<Pair<Integer, Person>>( - Avros.pairs(Avros.ints(), Avros.records(Person.class))); + DeepCopier<Pair> deepCopier = new TupleDeepCopier<Pair>( + Pair.class, Avros.ints(), Avros.records(Person.class)); Pair<Integer, Person> deepCopyPair = deepCopier.deepCopy(inputPair); http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/2932e9e4/crunch/src/test/java/org/apache/crunch/types/avro/AvroDeepCopierTest.java ---------------------------------------------------------------------- diff --git a/crunch/src/test/java/org/apache/crunch/types/avro/AvroDeepCopierTest.java b/crunch/src/test/java/org/apache/crunch/types/avro/AvroDeepCopierTest.java index fa1b4c4..6e2d89e 100644 --- a/crunch/src/test/java/org/apache/crunch/types/avro/AvroDeepCopierTest.java +++ b/crunch/src/test/java/org/apache/crunch/types/avro/AvroDeepCopierTest.java @@ -20,6 +20,7 @@ package org.apache.crunch.types.avro; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotSame; +import org.apache.avro.Schema; import org.apache.avro.generic.GenericData.Record; import org.apache.crunch.test.Person; import org.apache.crunch.types.avro.AvroDeepCopier.AvroSpecificDeepCopier; http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/2932e9e4/crunch/src/test/java/org/apache/crunch/types/avro/AvroTypeTest.java ---------------------------------------------------------------------- diff --git a/crunch/src/test/java/org/apache/crunch/types/avro/AvroTypeTest.java b/crunch/src/test/java/org/apache/crunch/types/avro/AvroTypeTest.java index 092b89e..28777f5 100644 --- a/crunch/src/test/java/org/apache/crunch/types/avro/AvroTypeTest.java +++ b/crunch/src/test/java/org/apache/crunch/types/avro/AvroTypeTest.java @@ -30,6 +30,7 @@ import java.util.Map; import org.apache.avro.generic.GenericData; import org.apache.avro.generic.GenericData.Record; import org.apache.crunch.Pair; +import org.apache.crunch.TupleN; import org.apache.crunch.test.Person; import org.apache.crunch.test.StringWrapper; import org.junit.Test; @@ -228,5 +229,16 @@ public class AvroTypeTest { assertEquals(stringPersonMap, detachedMap); assertNotSame(value, detachedMap.get(key)); } - + + @Test + public void testGetDetachedValue_TupleN(){ + Person person = createPerson(); + AvroType<TupleN> ptype = Avros.tuples(Avros.records(Person.class)); + TupleN tuple = new TupleN(person); + TupleN detachedTuple = ptype.getDetachedValue(tuple); + + assertEquals(tuple, detachedTuple); + assertNotSame(person, detachedTuple.get(0)); + } + } http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/2932e9e4/crunch/src/test/java/org/apache/crunch/types/avro/AvrosTest.java ---------------------------------------------------------------------- diff --git a/crunch/src/test/java/org/apache/crunch/types/avro/AvrosTest.java b/crunch/src/test/java/org/apache/crunch/types/avro/AvrosTest.java index aec19f2..6e66f74 100644 --- a/crunch/src/test/java/org/apache/crunch/types/avro/AvrosTest.java +++ b/crunch/src/test/java/org/apache/crunch/types/avro/AvrosTest.java @@ -17,10 +17,10 @@ */ package org.apache.crunch.types.avro; -import static org.junit.Assert.*; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNotSame; import static org.junit.Assert.assertTrue; import java.nio.ByteBuffer; @@ -39,6 +39,7 @@ import org.apache.crunch.Tuple4; import org.apache.crunch.TupleN; import org.apache.crunch.test.Person; import org.apache.crunch.test.StringWrapper; +import org.apache.crunch.types.DeepCopier; import org.apache.crunch.types.PTableType; import org.apache.crunch.types.PType; import org.apache.hadoop.io.IntWritable; @@ -224,7 +225,7 @@ public class AvrosTest { @Test public void testIsPrimitive_TruePrimitiveValue(){ - AvroType truePrimitiveAvroType = new AvroType(int.class, Schema.create(Type.INT)); + AvroType truePrimitiveAvroType = new AvroType(int.class, Schema.create(Type.INT), new DeepCopier.NoOpDeepCopier()); assertTrue(Avros.isPrimitive(truePrimitiveAvroType)); } http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/2932e9e4/crunch/src/test/java/org/apache/crunch/types/writable/WritableDeepCopierTest.java ---------------------------------------------------------------------- diff --git a/crunch/src/test/java/org/apache/crunch/types/writable/WritableDeepCopierTest.java b/crunch/src/test/java/org/apache/crunch/types/writable/WritableDeepCopierTest.java new file mode 100644 index 0000000..a691df2 --- /dev/null +++ b/crunch/src/test/java/org/apache/crunch/types/writable/WritableDeepCopierTest.java @@ -0,0 +1,46 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.crunch.types.writable; + +import static org.junit.Assert.*; + +import org.apache.hadoop.io.Text; +import org.junit.Before; +import org.junit.Test; + + +public class WritableDeepCopierTest { + + private WritableDeepCopier<Text> deepCopier; + + @Before + public void setUp(){ + deepCopier = new WritableDeepCopier<Text>(Text.class); + } + + @Test + public void testDeepCopy(){ + Text text = new Text("value"); + Text deepCopy = deepCopier.deepCopy(text); + + assertEquals(text, deepCopy); + assertNotSame(text, deepCopy); + } + + +} http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/2932e9e4/crunch/src/test/java/org/apache/crunch/types/writable/WritableTypeTest.java ---------------------------------------------------------------------- diff --git a/crunch/src/test/java/org/apache/crunch/types/writable/WritableTypeTest.java b/crunch/src/test/java/org/apache/crunch/types/writable/WritableTypeTest.java index ea0d11a..51a87f5 100644 --- a/crunch/src/test/java/org/apache/crunch/types/writable/WritableTypeTest.java +++ b/crunch/src/test/java/org/apache/crunch/types/writable/WritableTypeTest.java @@ -20,18 +20,20 @@ package org.apache.crunch.types.writable; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotSame; import static org.junit.Assert.assertSame; +import static org.junit.Assert.fail; +import java.util.Collection; +import java.util.Map; + +import org.apache.crunch.Pair; +import org.apache.hadoop.io.MapWritable; import org.apache.hadoop.io.Text; import org.junit.Test; -public class WritableTypeTest { +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; - @Test - public void testGetDetachedValue_AlreadyMappedWritable() { - WritableType<String, Text> stringType = Writables.strings(); - String value = "test"; - assertSame(value, stringType.getDetachedValue(value)); - } +public class WritableTypeTest { @Test public void testGetDetachedValue_CustomWritable() { @@ -43,4 +45,41 @@ public class WritableTypeTest { assertNotSame(value, detachedValue); } + @Test + public void testGetDetachedValue_Collection() { + Collection<Text> textCollection = Lists.newArrayList(new Text("value")); + WritableType<Collection<Text>, GenericArrayWritable<Text>> ptype = Writables.collections(Writables + .writables(Text.class)); + + Collection<Text> detachedCollection = ptype.getDetachedValue(textCollection); + assertEquals(textCollection, detachedCollection); + assertNotSame(textCollection.iterator().next(), detachedCollection.iterator().next()); + } + + @Test + public void testGetDetachedValue_Tuple() { + Pair<Text, Text> textPair = Pair.of(new Text("one"), new Text("two")); + WritableType<Pair<Text, Text>, TupleWritable> ptype = Writables.pairs(Writables.writables(Text.class), + Writables.writables(Text.class)); + ptype.getOutputMapFn().initialize(); + ptype.getInputMapFn().initialize(); + + Pair<Text, Text> detachedPair = ptype.getDetachedValue(textPair); + assertEquals(textPair, detachedPair); + assertNotSame(textPair.first(), detachedPair.first()); + assertNotSame(textPair.second(), detachedPair.second()); + } + + @Test + public void testGetDetachedValue_Map() { + Map<String, Text> stringTextMap = Maps.newHashMap(); + stringTextMap.put("key", new Text("value")); + + WritableType<Map<String, Text>, MapWritable> ptype = Writables.maps(Writables.writables(Text.class)); + Map<String, Text> detachedMap = ptype.getDetachedValue(stringTextMap); + + assertEquals(stringTextMap, detachedMap); + assertNotSame(stringTextMap.get("key"), detachedMap.get("key")); + } + } http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/2932e9e4/crunch/src/test/java/org/apache/crunch/types/writable/WritablesTest.java ---------------------------------------------------------------------- diff --git a/crunch/src/test/java/org/apache/crunch/types/writable/WritablesTest.java b/crunch/src/test/java/org/apache/crunch/types/writable/WritablesTest.java index a8699b3..5396fba 100644 --- a/crunch/src/test/java/org/apache/crunch/types/writable/WritablesTest.java +++ b/crunch/src/test/java/org/apache/crunch/types/writable/WritablesTest.java @@ -253,11 +253,4 @@ public class WritablesTest { assertEquals(writable, ptype.getOutputMapFn().map(java)); } - @Test - public void testDeepCopy() { - Text text = new Text("Test"); - Text copiedText = Writables.deepCopy(text, Text.class); - assertEquals(text, copiedText); - assertNotSame(text, copiedText); - } }
