Repository: crunch Updated Branches: refs/heads/master 26f6bb2d3 -> f81b5c253
CRUNCH-382 Add DeepCopier for Avro ByteBuffers Replace the NoOpDeepCopier for Avro ByteBuffers. ByteBuffers are not immutable, and are reused within Avro, so places where PType#getDetachedValue on Avro byte[] values will now work as expected. Project: http://git-wip-us.apache.org/repos/asf/crunch/repo Commit: http://git-wip-us.apache.org/repos/asf/crunch/commit/f81b5c25 Tree: http://git-wip-us.apache.org/repos/asf/crunch/tree/f81b5c25 Diff: http://git-wip-us.apache.org/repos/asf/crunch/diff/f81b5c25 Branch: refs/heads/master Commit: f81b5c253fb6380e5cd68d320a9a74f0b2a2c5d0 Parents: 26f6bb2 Author: Gabriel Reid <[email protected]> Authored: Fri Apr 25 21:56:31 2014 +0200 Committer: Gabriel Reid <[email protected]> Committed: Fri Apr 25 22:36:51 2014 +0200 ---------------------------------------------------------------------- .../crunch/types/avro/AvroDeepCopier.java | 23 ++++++++++++++++++++ .../org/apache/crunch/types/avro/Avros.java | 2 +- .../crunch/types/avro/AvroDeepCopierTest.java | 16 ++++++++++++-- 3 files changed, 38 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/crunch/blob/f81b5c25/crunch-core/src/main/java/org/apache/crunch/types/avro/AvroDeepCopier.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/types/avro/AvroDeepCopier.java b/crunch-core/src/main/java/org/apache/crunch/types/avro/AvroDeepCopier.java index 1eca1b8..56ec459 100644 --- a/crunch-core/src/main/java/org/apache/crunch/types/avro/AvroDeepCopier.java +++ b/crunch-core/src/main/java/org/apache/crunch/types/avro/AvroDeepCopier.java @@ -19,6 +19,7 @@ package org.apache.crunch.types.avro; import java.io.ByteArrayOutputStream; import java.io.Serializable; +import java.nio.ByteBuffer; import org.apache.avro.Schema; import org.apache.avro.generic.GenericData; @@ -205,4 +206,26 @@ abstract class AvroDeepCopier<T> implements DeepCopier<T>, Serializable { } } + /** + * Copies ByteBuffers that are stored in Avro. A specific case is needed here + * because ByteBuffers are the one built-in case where the serialization type is different + * than the output type and the output type isn't immutable. + */ + public static class AvroByteBufferDeepCopier implements DeepCopier<ByteBuffer> { + + public static final AvroByteBufferDeepCopier INSTANCE = new AvroByteBufferDeepCopier(); + + @Override + public void initialize(Configuration conf) { + // No-op + } + + @Override + public ByteBuffer deepCopy(ByteBuffer source) { + byte[] copy = new byte[source.limit()]; + System.arraycopy(source.array(), 0, copy, 0, source.limit()); + return ByteBuffer.wrap(copy); + } + } + } http://git-wip-us.apache.org/repos/asf/crunch/blob/f81b5c25/crunch-core/src/main/java/org/apache/crunch/types/avro/Avros.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/types/avro/Avros.java b/crunch-core/src/main/java/org/apache/crunch/types/avro/Avros.java index 266cb12..1fcb30e 100644 --- a/crunch-core/src/main/java/org/apache/crunch/types/avro/Avros.java +++ b/crunch-core/src/main/java/org/apache/crunch/types/avro/Avros.java @@ -188,7 +188,7 @@ public class Avros { private static final AvroType<Boolean> booleans = create(Boolean.class, Schema.Type.BOOLEAN); private static final AvroType<ByteBuffer> bytes = new AvroType<ByteBuffer>(ByteBuffer.class, Schema.create(Schema.Type.BYTES), BYTES_IN, IdentityFn.getInstance(), - NoOpDeepCopier.<ByteBuffer>create(), AvroType.AvroRecordType.GENERIC); + AvroDeepCopier.AvroByteBufferDeepCopier.INSTANCE, AvroType.AvroRecordType.GENERIC); private static final Map<Class<?>, PType<?>> PRIMITIVES = ImmutableMap.<Class<?>, PType<?>> builder() .put(String.class, strings).put(Long.class, longs).put(Integer.class, ints).put(Float.class, floats) http://git-wip-us.apache.org/repos/asf/crunch/blob/f81b5c25/crunch-core/src/test/java/org/apache/crunch/types/avro/AvroDeepCopierTest.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/test/java/org/apache/crunch/types/avro/AvroDeepCopierTest.java b/crunch-core/src/test/java/org/apache/crunch/types/avro/AvroDeepCopierTest.java index 37c13c0..9d43f0c 100644 --- a/crunch-core/src/test/java/org/apache/crunch/types/avro/AvroDeepCopierTest.java +++ b/crunch-core/src/test/java/org/apache/crunch/types/avro/AvroDeepCopierTest.java @@ -17,20 +17,21 @@ */ package org.apache.crunch.types.avro; +import static org.junit.Assert.assertArrayEquals; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotSame; import static org.junit.Assert.assertNull; +import java.nio.ByteBuffer; import java.util.List; +import com.google.common.collect.Lists; import org.apache.avro.generic.GenericData.Record; import org.apache.crunch.test.Person; import org.apache.crunch.types.avro.AvroDeepCopier.AvroSpecificDeepCopier; import org.apache.hadoop.conf.Configuration; import org.junit.Test; -import com.google.common.collect.Lists; - public class AvroDeepCopierTest { @Test @@ -104,4 +105,15 @@ public class AvroDeepCopierTest { assertNull(deepCopyPerson); } + @Test + public void testDeepCopy_ByteBuffer() { + byte[] bytes = new byte[] { 1, 2, 3 }; + ByteBuffer buffer = ByteBuffer.wrap(bytes); + ByteBuffer deepCopied = new AvroDeepCopier.AvroByteBufferDeepCopier().INSTANCE.deepCopy(buffer); + + // Change the original array to make sure we've really got a copy + bytes[0] = 0; + assertArrayEquals(new byte[] { 1, 2, 3 }, deepCopied.array()); + + } }
