Repository: crunch Updated Branches: refs/heads/master 9146d10da -> 95e92fc89
CRUNCH-547: Properly handle nullability for Avro union types Project: http://git-wip-us.apache.org/repos/asf/crunch/repo Commit: http://git-wip-us.apache.org/repos/asf/crunch/commit/95e92fc8 Tree: http://git-wip-us.apache.org/repos/asf/crunch/tree/95e92fc8 Diff: http://git-wip-us.apache.org/repos/asf/crunch/diff/95e92fc8 Branch: refs/heads/master Commit: 95e92fc898329b5b762e0a76be1b3396d61430b4 Parents: 9146d10 Author: Josh Wills <[email protected]> Authored: Wed Jul 22 10:38:56 2015 -0700 Committer: Josh Wills <[email protected]> Committed: Thu Jul 23 10:15:09 2015 -0700 ---------------------------------------------------------------------- .../apache/crunch/types/avro/AvroTableType.java | 12 ++---------- .../org/apache/crunch/types/avro/Avros.java | 20 ++++++++++++++++++-- .../crunch/types/avro/AvroTableTypeTest.java | 20 ++++++++++++++++++++ 3 files changed, 40 insertions(+), 12 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/crunch/blob/95e92fc8/crunch-core/src/main/java/org/apache/crunch/types/avro/AvroTableType.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/types/avro/AvroTableType.java b/crunch-core/src/main/java/org/apache/crunch/types/avro/AvroTableType.java index 30db1ed..02b9cfb 100644 --- a/crunch-core/src/main/java/org/apache/crunch/types/avro/AvroTableType.java +++ b/crunch-core/src/main/java/org/apache/crunch/types/avro/AvroTableType.java @@ -17,7 +17,6 @@ */ package org.apache.crunch.types.avro; -import com.google.common.collect.ImmutableList; import org.apache.avro.Schema; import org.apache.avro.generic.IndexedRecord; import org.apache.crunch.MapFn; @@ -70,7 +69,7 @@ class AvroTableType<K, V> extends BaseAvroTableType<K, V> implements PTableType< valueMapFn.initialize(); pairSchemaJson = org.apache.avro.mapred.Pair.getPairSchema( new Schema.Parser().parse(firstJson), - nullable(new Schema.Parser().parse(secondJson))).toString(); + Avros.allowNulls(new Schema.Parser().parse(secondJson))).toString(); } @Override @@ -124,7 +123,7 @@ class AvroTableType<K, V> extends BaseAvroTableType<K, V> implements PTableType< public AvroTableType(AvroType<K> keyType, AvroType<V> valueType, Class<Pair<K, V>> pairClass) { super(pairClass, org.apache.avro.mapred.Pair.getPairSchema(keyType.getSchema(), - nullable(valueType.getSchema())), + Avros.allowNulls(valueType.getSchema())), new IndexedRecordToPair(keyType.getInputMapFn(), valueType.getInputMapFn()), new PairToAvroPair(keyType, valueType), new TupleDeepCopier(Pair.class, keyType, valueType), null, keyType, valueType); @@ -151,11 +150,4 @@ class AvroTableType<K, V> extends BaseAvroTableType<K, V> implements PTableType< public Pair<K, V> getDetachedValue(Pair<K, V> value) { return PTables.getDetachedValue(this, value); } - - private static Schema nullable(Schema schema) { - if (schema.getType() == Schema.Type.NULL) { - return schema; - } - return Schema.createUnion(ImmutableList.of(schema, Schema.create(Schema.Type.NULL))); - } } http://git-wip-us.apache.org/repos/asf/crunch/blob/95e92fc8/crunch-core/src/main/java/org/apache/crunch/types/avro/Avros.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/types/avro/Avros.java b/crunch-core/src/main/java/org/apache/crunch/types/avro/Avros.java index 3642769..989aa24 100644 --- a/crunch-core/src/main/java/org/apache/crunch/types/avro/Avros.java +++ b/crunch-core/src/main/java/org/apache/crunch/types/avro/Avros.java @@ -919,11 +919,27 @@ public class Avros { private static final Schema NULL_SCHEMA = Schema.create(Type.NULL); - private static Schema allowNulls(Schema base) { + static Schema allowNulls(Schema base) { if (NULL_SCHEMA.equals(base)) { return base; + } else if (base.getType() == Type.UNION) { + List<Schema> types = Lists.newArrayList(); + boolean hasNull = false; + for (Schema s : base.getTypes()) { + if (s.getType() == Schema.Type.NULL) { + hasNull = true; + } + types.add(s); + } + if (hasNull) { + return base; + } else { + types.add(Schema.create(Schema.Type.NULL)); + return Schema.createUnion(types); + } + } else { + return Schema.createUnion(ImmutableList.of(base, NULL_SCHEMA)); } - return Schema.createUnion(ImmutableList.of(base, NULL_SCHEMA)); } private static class ReflectGenericRecord extends GenericData.Record { http://git-wip-us.apache.org/repos/asf/crunch/blob/95e92fc8/crunch-core/src/test/java/org/apache/crunch/types/avro/AvroTableTypeTest.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/test/java/org/apache/crunch/types/avro/AvroTableTypeTest.java b/crunch-core/src/test/java/org/apache/crunch/types/avro/AvroTableTypeTest.java index 35d4e5b..e5f2dea 100644 --- a/crunch-core/src/test/java/org/apache/crunch/types/avro/AvroTableTypeTest.java +++ b/crunch-core/src/test/java/org/apache/crunch/types/avro/AvroTableTypeTest.java @@ -23,6 +23,9 @@ import static org.junit.Assert.assertNotSame; import static org.junit.Assert.assertSame; import static org.junit.Assert.assertTrue; +import com.google.common.collect.ImmutableList; +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericData; import org.apache.crunch.Pair; import org.apache.crunch.test.Person; import org.apache.crunch.test.StringWrapper; @@ -31,6 +34,8 @@ import org.junit.Test; import com.google.common.collect.Lists; +import java.util.List; + public class AvroTableTypeTest { @Test @@ -55,6 +60,21 @@ public class AvroTableTypeTest { } @Test + public void testUnionValueType() { + List<Schema> schemas = Lists.newArrayList(); + schemas.add(Schema.create(Schema.Type.BOOLEAN)); + schemas.add(Schema.create(Schema.Type.INT)); + Schema union = Schema.createUnion(schemas); + boolean success = false; + try { + Avros.tableOf(Avros.longs(), Avros.generics(union)); + success = true; + } catch (Exception shouldNotBeThrown) { + } + assertTrue("Union type was properly made nullable", success); + } + + @Test public void testIsReflect_ContainsReflectKey() { assertTrue(Avros.tableOf(Avros.reflects(StringWrapper.class), Avros.ints()).hasReflect()); }
