Repository: crunch Updated Branches: refs/heads/master 6463da421 -> b5b7f48eb
CRUNCH-592: Job fails for null ByteBuffer value in Avro tables. Project: http://git-wip-us.apache.org/repos/asf/crunch/repo Commit: http://git-wip-us.apache.org/repos/asf/crunch/commit/b5b7f48e Tree: http://git-wip-us.apache.org/repos/asf/crunch/tree/b5b7f48e Diff: http://git-wip-us.apache.org/repos/asf/crunch/diff/b5b7f48e Branch: refs/heads/master Commit: b5b7f48ebf5a88b69084dcc08be76b8819fc00a7 Parents: 6463da4 Author: Tom White <[email protected]> Authored: Wed Feb 10 16:02:31 2016 +0000 Committer: Tom White <[email protected]> Committed: Wed Feb 10 16:02:31 2016 +0000 ---------------------------------------------------------------------- .../java/org/apache/crunch/EmitNullAvroIT.java | 19 +++++++++++++++++++ .../java/org/apache/crunch/types/avro/Avros.java | 9 +++++++++ .../org/apache/crunch/types/avro/AvrosTest.java | 2 ++ 3 files changed, 30 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/crunch/blob/b5b7f48e/crunch-core/src/it/java/org/apache/crunch/EmitNullAvroIT.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/it/java/org/apache/crunch/EmitNullAvroIT.java b/crunch-core/src/it/java/org/apache/crunch/EmitNullAvroIT.java index 4353b90..e387db5 100644 --- a/crunch-core/src/it/java/org/apache/crunch/EmitNullAvroIT.java +++ b/crunch-core/src/it/java/org/apache/crunch/EmitNullAvroIT.java @@ -19,6 +19,7 @@ package org.apache.crunch; import java.io.Serializable; +import java.nio.ByteBuffer; import org.apache.crunch.impl.mr.MRPipeline; import org.apache.crunch.io.From; import org.apache.crunch.io.avro.AvroFileTarget; @@ -46,4 +47,22 @@ public class EmitNullAvroIT extends CrunchTestSupport implements Serializable { p.done(); } + + @Test + public void testNullableAvroPTable_ByteBuffer() throws Exception { + final Pipeline p = new MRPipeline(EmitNullAvroIT.class, tempDir.getDefaultConfiguration()); + final Path outDir = tempDir.getPath("out"); + final PCollection<String> input = p.read(From.textFile(tempDir.copyResourceFileName("docs.txt"))); + + input.parallelDo(new MapFn<String, Pair<String, ByteBuffer>>() { + @Override + public Pair<String, ByteBuffer> map(final String input) { + return new Pair<String, ByteBuffer>("first name", null); + } + }, Avros.tableOf(Avros.strings(), Avros.bytes())) + .groupByKey() + .write(new AvroFileTarget(outDir), Target.WriteMode.APPEND); + + p.done(); + } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/crunch/blob/b5b7f48e/crunch-core/src/main/java/org/apache/crunch/types/avro/Avros.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/types/avro/Avros.java b/crunch-core/src/main/java/org/apache/crunch/types/avro/Avros.java index 989aa24..f9afe05 100644 --- a/crunch-core/src/main/java/org/apache/crunch/types/avro/Avros.java +++ b/crunch-core/src/main/java/org/apache/crunch/types/avro/Avros.java @@ -155,6 +155,9 @@ public class Avros { public static MapFn<CharSequence, String> UTF8_TO_STRING = new MapFn<CharSequence, String>() { @Override public String map(CharSequence input) { + if (input == null) { + return null; + } return input.toString(); } }; @@ -162,6 +165,9 @@ public class Avros { public static MapFn<String, Utf8> STRING_TO_UTF8 = new MapFn<String, Utf8>() { @Override public Utf8 map(String input) { + if (input == null) { + return null; + } return new Utf8(input); } }; @@ -169,6 +175,9 @@ public class Avros { public static MapFn<Object, ByteBuffer> BYTES_IN = new MapFn<Object, ByteBuffer>() { @Override public ByteBuffer map(Object input) { + if (input == null) { + return null; + } if (input instanceof ByteBuffer) { return (ByteBuffer) input; } http://git-wip-us.apache.org/repos/asf/crunch/blob/b5b7f48e/crunch-core/src/test/java/org/apache/crunch/types/avro/AvrosTest.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/test/java/org/apache/crunch/types/avro/AvrosTest.java b/crunch-core/src/test/java/org/apache/crunch/types/avro/AvrosTest.java index 46c295e..c002012 100644 --- a/crunch-core/src/test/java/org/apache/crunch/types/avro/AvrosTest.java +++ b/crunch-core/src/test/java/org/apache/crunch/types/avro/AvrosTest.java @@ -68,6 +68,7 @@ public class AvrosTest { String s = "abc"; Utf8 w = new Utf8(s); testInputOutputFn(Avros.strings(), s, w); + testInputOutputFn(Avros.strings(), null, null); } @Test @@ -105,6 +106,7 @@ public class AvrosTest { byte[] bytes = new byte[] { 17, 26, -98 }; ByteBuffer bb = ByteBuffer.wrap(bytes); testInputOutputFn(Avros.bytes(), bb, bb); + testInputOutputFn(Avros.bytes(), null, null); } @Test
