Repository: crunch Updated Branches: refs/heads/master 2e1350951 -> f17d7f7b0
CRUNCH-459: Allowing nulls as values in Avro PTableTypes Project: http://git-wip-us.apache.org/repos/asf/crunch/repo Commit: http://git-wip-us.apache.org/repos/asf/crunch/commit/f17d7f7b Tree: http://git-wip-us.apache.org/repos/asf/crunch/tree/f17d7f7b Diff: http://git-wip-us.apache.org/repos/asf/crunch/diff/f17d7f7b Branch: refs/heads/master Commit: f17d7f7b017ae30ab8c7a164e174cc5280daab3d Parents: 2e13509 Author: Josh Wills <[email protected]> Authored: Thu Feb 6 12:29:36 2014 -0800 Committer: Josh Wills <[email protected]> Committed: Thu Aug 7 18:52:35 2014 -0700 ---------------------------------------------------------------------- .../java/org/apache/crunch/EmitNullAvroIT.java | 49 ++++++++++++++++++++ .../apache/crunch/types/avro/AvroTableType.java | 14 +++++- 2 files changed, 61 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/crunch/blob/f17d7f7b/crunch-core/src/it/java/org/apache/crunch/EmitNullAvroIT.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/it/java/org/apache/crunch/EmitNullAvroIT.java b/crunch-core/src/it/java/org/apache/crunch/EmitNullAvroIT.java new file mode 100644 index 0000000..4353b90 --- /dev/null +++ b/crunch-core/src/it/java/org/apache/crunch/EmitNullAvroIT.java @@ -0,0 +1,49 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.crunch; + +import java.io.Serializable; + +import org.apache.crunch.impl.mr.MRPipeline; +import org.apache.crunch.io.From; +import org.apache.crunch.io.avro.AvroFileTarget; +import org.apache.crunch.test.CrunchTestSupport; +import org.apache.crunch.test.Person; +import org.apache.crunch.types.avro.Avros; +import org.apache.hadoop.fs.Path; +import org.junit.Test; + +public class EmitNullAvroIT extends CrunchTestSupport implements Serializable { + @Test + public void testNullableAvroPTable() throws Exception { + // This test fails if values are not nullable + final Pipeline p = new MRPipeline(EmitNullAvroIT.class, tempDir.getDefaultConfiguration()); + final Path outDir = tempDir.getPath("out"); + final PCollection<String> input = p.read(From.textFile(tempDir.copyResourceFileName("docs.txt"))); + + input.parallelDo(new MapFn<String, Pair<String, Person>>() { + @Override + public Pair<String, Person> map(final String input) { + return new Pair<String, Person>("first name", null); + } + }, Avros.tableOf(Avros.strings(), Avros.records(Person.class))) + .write(new AvroFileTarget(outDir), Target.WriteMode.APPEND); + + p.done(); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/crunch/blob/f17d7f7b/crunch-core/src/main/java/org/apache/crunch/types/avro/AvroTableType.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/types/avro/AvroTableType.java b/crunch-core/src/main/java/org/apache/crunch/types/avro/AvroTableType.java index 00047cc..30db1ed 100644 --- a/crunch-core/src/main/java/org/apache/crunch/types/avro/AvroTableType.java +++ b/crunch-core/src/main/java/org/apache/crunch/types/avro/AvroTableType.java @@ -17,6 +17,7 @@ */ package org.apache.crunch.types.avro; +import com.google.common.collect.ImmutableList; import org.apache.avro.Schema; import org.apache.avro.generic.IndexedRecord; import org.apache.crunch.MapFn; @@ -68,7 +69,8 @@ class AvroTableType<K, V> extends BaseAvroTableType<K, V> implements PTableType< keyMapFn.initialize(); valueMapFn.initialize(); pairSchemaJson = org.apache.avro.mapred.Pair.getPairSchema( - new Schema.Parser().parse(firstJson), new Schema.Parser().parse(secondJson)).toString(); + new Schema.Parser().parse(firstJson), + nullable(new Schema.Parser().parse(secondJson))).toString(); } @Override @@ -122,7 +124,8 @@ class AvroTableType<K, V> extends BaseAvroTableType<K, V> implements PTableType< public AvroTableType(AvroType<K> keyType, AvroType<V> valueType, Class<Pair<K, V>> pairClass) { super(pairClass, org.apache.avro.mapred.Pair.getPairSchema(keyType.getSchema(), - valueType.getSchema()), new IndexedRecordToPair(keyType.getInputMapFn(), + nullable(valueType.getSchema())), + new IndexedRecordToPair(keyType.getInputMapFn(), valueType.getInputMapFn()), new PairToAvroPair(keyType, valueType), new TupleDeepCopier(Pair.class, keyType, valueType), null, keyType, valueType); this.keyType = keyType; @@ -148,4 +151,11 @@ class AvroTableType<K, V> extends BaseAvroTableType<K, V> implements PTableType< public Pair<K, V> getDetachedValue(Pair<K, V> value) { return PTables.getDetachedValue(this, value); } + + private static Schema nullable(Schema schema) { + if (schema.getType() == Schema.Type.NULL) { + return schema; + } + return Schema.createUnion(ImmutableList.of(schema, Schema.create(Schema.Type.NULL))); + } }
