Repository: crunch Updated Branches: refs/heads/master a8ce5385e -> 772f7f2b2
CRUNCH-416 Use built-in Avro deep copying Replace serialization-based deep copying for Avro specific and generic records. Project: http://git-wip-us.apache.org/repos/asf/crunch/repo Commit: http://git-wip-us.apache.org/repos/asf/crunch/commit/772f7f2b Tree: http://git-wip-us.apache.org/repos/asf/crunch/tree/772f7f2b Diff: http://git-wip-us.apache.org/repos/asf/crunch/diff/772f7f2b Branch: refs/heads/master Commit: 772f7f2b21c9c817f481fd7e2a4f030378cadb3d Parents: a8ce538 Author: Gabriel Reid <[email protected]> Authored: Sun Jun 8 22:19:20 2014 +0200 Committer: Gabriel Reid <[email protected]> Committed: Mon Jun 9 22:03:36 2014 +0200 ---------------------------------------------------------------------- .../crunch/types/avro/AvroDeepCopier.java | 147 +++++++------------ .../org/apache/crunch/types/avro/Avros.java | 6 +- .../crunch/types/avro/AvroDeepCopierTest.java | 29 ++-- .../AvroSpecificDeepCopierClassloaderTest.java | 2 +- 4 files changed, 75 insertions(+), 109 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/crunch/blob/772f7f2b/crunch-core/src/main/java/org/apache/crunch/types/avro/AvroDeepCopier.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/types/avro/AvroDeepCopier.java b/crunch-core/src/main/java/org/apache/crunch/types/avro/AvroDeepCopier.java index 855aa79..dcc4a19 100644 --- a/crunch-core/src/main/java/org/apache/crunch/types/avro/AvroDeepCopier.java +++ b/crunch-core/src/main/java/org/apache/crunch/types/avro/AvroDeepCopier.java @@ -24,16 +24,13 @@ import java.nio.ByteBuffer; import org.apache.avro.Schema; import org.apache.avro.generic.GenericData; import org.apache.avro.generic.GenericData.Record; -import org.apache.avro.generic.GenericDatumReader; -import org.apache.avro.generic.GenericDatumWriter; import org.apache.avro.io.BinaryDecoder; import org.apache.avro.io.BinaryEncoder; import org.apache.avro.io.DatumReader; import org.apache.avro.io.DatumWriter; import org.apache.avro.io.DecoderFactory; import org.apache.avro.io.EncoderFactory; -import org.apache.avro.specific.SpecificDatumReader; -import org.apache.avro.specific.SpecificDatumWriter; +import org.apache.avro.specific.SpecificData; import org.apache.crunch.CrunchRuntimeException; import org.apache.crunch.types.DeepCopier; import org.apache.hadoop.conf.Configuration; @@ -48,13 +45,8 @@ import org.apache.hadoop.conf.Configuration; abstract class AvroDeepCopier<T> implements DeepCopier<T>, Serializable { private String jsonSchema; - private transient Configuration conf; + protected transient Configuration conf; private transient Schema schema; - private BinaryEncoder binaryEncoder; - private BinaryDecoder binaryDecoder; - - private transient DatumWriter<T> datumWriter; - private transient DatumReader<T> datumReader; public AvroDeepCopier(Schema schema) { this.jsonSchema = schema.toString(); @@ -72,39 +64,19 @@ abstract class AvroDeepCopier<T> implements DeepCopier<T>, Serializable { this.conf = conf; } - protected abstract T createCopyTarget(); - - protected abstract DatumWriter<T> createDatumWriter(Configuration conf); - - protected abstract DatumReader<T> createDatumReader(Configuration conf); - /** * Deep copier for Avro specific data objects. */ public static class AvroSpecificDeepCopier<T> extends AvroDeepCopier<T> { - private Class<T> valueClass; - - public AvroSpecificDeepCopier(Class<T> valueClass, Schema schema) { + public AvroSpecificDeepCopier(Schema schema) { super(schema); - this.valueClass = valueClass; - } - - @Override - protected T createCopyTarget() { - return createNewInstance(valueClass); - } - - @Override - protected DatumWriter<T> createDatumWriter(Configuration conf) { - return new SpecificDatumWriter<T>(valueClass); } @Override - protected DatumReader<T> createDatumReader(Configuration conf) { - return new SpecificDatumReader<T>(valueClass); + public T deepCopy(T source) { + return SpecificData.get().deepCopy(getSchema(), source); } - } /** @@ -112,25 +84,13 @@ abstract class AvroDeepCopier<T> implements DeepCopier<T>, Serializable { */ public static class AvroGenericDeepCopier extends AvroDeepCopier<Record> { - private transient Schema schema; - public AvroGenericDeepCopier(Schema schema) { super(schema); } @Override - protected Record createCopyTarget() { - return new GenericData.Record(getSchema()); - } - - @Override - protected DatumReader<Record> createDatumReader(Configuration conf) { - return new GenericDatumReader<Record>(getSchema()); - } - - @Override - protected DatumWriter<Record> createDatumWriter(Configuration conf) { - return new GenericDatumWriter<Record>(getSchema()); + public Record deepCopy(Record source) { + return GenericData.get().deepCopy(getSchema(), source); } } @@ -139,72 +99,71 @@ abstract class AvroDeepCopier<T> implements DeepCopier<T>, Serializable { */ public static class AvroReflectDeepCopier<T> extends AvroDeepCopier<T> { - private Class<T> valueClass; + private DatumReader<T> datumReader; + private DatumWriter<T> datumWriter; + private BinaryEncoder binaryEncoder; + private BinaryDecoder binaryDecoder; + private final Class<T> valueClass; public AvroReflectDeepCopier(Class<T> valueClass, Schema schema) { super(schema); this.valueClass = valueClass; } - @Override - protected T createCopyTarget() { - return createNewInstance(valueClass); - } - - @Override protected DatumReader<T> createDatumReader(Configuration conf) { return AvroMode.REFLECT.withFactoryFromConfiguration(conf).getReader(getSchema()); } - @Override protected DatumWriter<T> createDatumWriter(Configuration conf) { return AvroMode.REFLECT.withFactoryFromConfiguration(conf).getWriter(getSchema()); } - } - /** - * Create a deep copy of an Avro value. - * - * @param source The value to be copied - * @return The deep copy of the value - */ - @Override - public T deepCopy(T source) { - - if (source == null) { - return null; - } - - if (datumReader == null) { - datumReader = createDatumReader(conf); - } - if (datumWriter == null) { - datumWriter = createDatumWriter(conf); - } - ByteArrayOutputStream byteOutStream = new ByteArrayOutputStream(); - binaryEncoder = EncoderFactory.get().binaryEncoder(byteOutStream, binaryEncoder); - T target = createCopyTarget(); - try { - datumWriter.write(source, binaryEncoder); - binaryEncoder.flush(); - binaryDecoder = DecoderFactory.get() - .binaryDecoder(byteOutStream.toByteArray(), binaryDecoder); - return datumReader.read(target, binaryDecoder); - } catch (Exception e) { - throw new CrunchRuntimeException("Error while deep copying avro value " + source, e); + /** + * Create a deep copy of an Avro value. + * + * @param source The value to be copied + * @return The deep copy of the value + */ + @Override + public T deepCopy(T source) { + + if (source == null) { + return null; + } + + if (datumReader == null) { + datumReader = createDatumReader(conf); + } + if (datumWriter == null) { + datumWriter = createDatumWriter(conf); + } + ByteArrayOutputStream byteOutStream = new ByteArrayOutputStream(); + binaryEncoder = EncoderFactory.get().binaryEncoder(byteOutStream, binaryEncoder); + T target = createNewInstance(valueClass); + try { + datumWriter.write(source, binaryEncoder); + binaryEncoder.flush(); + binaryDecoder = DecoderFactory.get() + .binaryDecoder(byteOutStream.toByteArray(), binaryDecoder); + return datumReader.read(target, binaryDecoder); + } catch (Exception e) { + throw new CrunchRuntimeException("Error while deep copying avro value " + source, e); + } } - } - protected T createNewInstance(Class<T> targetClass) { - try { - return targetClass.newInstance(); - } catch (InstantiationException e) { - throw new CrunchRuntimeException(e); - } catch (IllegalAccessException e) { - throw new CrunchRuntimeException(e); + protected T createNewInstance(Class<T> targetClass) { + try { + return targetClass.newInstance(); + } catch (InstantiationException e) { + throw new CrunchRuntimeException(e); + } catch (IllegalAccessException e) { + throw new CrunchRuntimeException(e); + } } } + + /** * Copies ByteBuffers that are stored in Avro. A specific case is needed here * because ByteBuffers are the one built-in case where the serialization type is different http://git-wip-us.apache.org/repos/asf/crunch/blob/772f7f2b/crunch-core/src/main/java/org/apache/crunch/types/avro/Avros.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/types/avro/Avros.java b/crunch-core/src/main/java/org/apache/crunch/types/avro/Avros.java index 62945b1..4b2c67b 100644 --- a/crunch-core/src/main/java/org/apache/crunch/types/avro/Avros.java +++ b/crunch-core/src/main/java/org/apache/crunch/types/avro/Avros.java @@ -252,8 +252,8 @@ public class Avros { } public static final AvroType<GenericData.Record> generics(Schema schema) { - return new AvroType<GenericData.Record>(GenericData.Record.class, schema, new AvroDeepCopier.AvroGenericDeepCopier( - schema)); + return new AvroType<GenericData.Record>( + GenericData.Record.class, schema, new AvroDeepCopier.AvroGenericDeepCopier(schema)); } public static final <T> AvroType<T> containers(Class<T> clazz) { @@ -266,7 +266,7 @@ public class Avros { public static final <T extends SpecificRecord> AvroType<T> specifics(Class<T> clazz) { T t = ReflectionUtils.newInstance(clazz, null); Schema schema = t.getSchema(); - return new AvroType<T>(clazz, schema, new AvroDeepCopier.AvroSpecificDeepCopier<T>(clazz, schema)); + return new AvroType<T>(clazz, schema, new AvroDeepCopier.AvroSpecificDeepCopier<T>(schema)); } public static final <T> AvroType<T> reflects(Class<T> clazz) { http://git-wip-us.apache.org/repos/asf/crunch/blob/772f7f2b/crunch-core/src/test/java/org/apache/crunch/types/avro/AvroDeepCopierTest.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/test/java/org/apache/crunch/types/avro/AvroDeepCopierTest.java b/crunch-core/src/test/java/org/apache/crunch/types/avro/AvroDeepCopierTest.java index 9d43f0c..795e2b4 100644 --- a/crunch-core/src/test/java/org/apache/crunch/types/avro/AvroDeepCopierTest.java +++ b/crunch-core/src/test/java/org/apache/crunch/types/avro/AvroDeepCopierTest.java @@ -41,27 +41,35 @@ public class AvroDeepCopierTest { person.age = 42; person.siblingnames = Lists.<CharSequence> newArrayList(); - Person deepCopyPerson = new AvroSpecificDeepCopier<Person>(Person.class, Person.SCHEMA$) - .deepCopy(person); + Person deepCopyPerson = new AvroSpecificDeepCopier<Person>(Person.SCHEMA$).deepCopy(person); assertEquals(person, deepCopyPerson); assertNotSame(person, deepCopyPerson); } @Test + public void testDeepCopySpecific_Null() { + assertNull(new AvroSpecificDeepCopier<Person>(Person.SCHEMA$).deepCopy(null)); + } + + @Test public void testDeepCopyGeneric() { Record record = new Record(Person.SCHEMA$); record.put("name", "John Doe"); record.put("age", 42); record.put("siblingnames", Lists.newArrayList()); - Record deepCopyRecord = new AvroDeepCopier.AvroGenericDeepCopier(Person.SCHEMA$) - .deepCopy(record); + Record deepCopyRecord = new AvroDeepCopier.AvroGenericDeepCopier(Person.SCHEMA$).deepCopy(record); assertEquals(record, deepCopyRecord); assertNotSame(record, deepCopyRecord); } + @Test + public void testDeepCopyGeneric_Null() { + assertNull(new AvroDeepCopier.AvroGenericDeepCopier(Person.SCHEMA$).deepCopy(null)); + } + static class ReflectedPerson { String name; int age; @@ -94,15 +102,14 @@ public class AvroDeepCopierTest { assertNotSame(person, deepCopyPerson); } - - @Test - public void testDeepCopy_Null() { - Person person = null; - Person deepCopyPerson = new AvroSpecificDeepCopier<Person>(Person.class, Person.SCHEMA$) - .deepCopy(person); + @Test + public void testDeepCopyReflect_Null() { + AvroDeepCopier<ReflectedPerson> avroDeepCopier = new AvroDeepCopier.AvroReflectDeepCopier<ReflectedPerson>( + ReflectedPerson.class, Avros.reflects(ReflectedPerson.class).getSchema()); + avroDeepCopier.initialize(new Configuration()); - assertNull(deepCopyPerson); + assertNull(avroDeepCopier.deepCopy(null)); } @Test http://git-wip-us.apache.org/repos/asf/crunch/blob/772f7f2b/crunch-core/src/test/java/org/apache/crunch/types/avro/AvroSpecificDeepCopierClassloaderTest.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/test/java/org/apache/crunch/types/avro/AvroSpecificDeepCopierClassloaderTest.java b/crunch-core/src/test/java/org/apache/crunch/types/avro/AvroSpecificDeepCopierClassloaderTest.java index 11b0a78..26a8c6a 100644 --- a/crunch-core/src/test/java/org/apache/crunch/types/avro/AvroSpecificDeepCopierClassloaderTest.java +++ b/crunch-core/src/test/java/org/apache/crunch/types/avro/AvroSpecificDeepCopierClassloaderTest.java @@ -37,7 +37,7 @@ public class AvroSpecificDeepCopierClassloaderTest { person.age = 42; person.siblingnames = Lists.newArrayList(); - Person deepCopyPerson = new AvroSpecificDeepCopier<Person>(Person.class, Person.SCHEMA$) + Person deepCopyPerson = new AvroSpecificDeepCopier<Person>(Person.SCHEMA$) .deepCopy(person); assertEquals(person, deepCopyPerson);
