Author: cutting
Date: Wed Jul 27 22:52:46 2011
New Revision: 1151660
URL: http://svn.apache.org/viewvc?rev=1151660&view=rev
Log:
AVRO-864. Java: Fix reflect to be able to write unions containing generic
and/or specific records. Contributed by Isabel Drost.
Added:
avro/trunk/lang/java/mapred/src/test/java/org/apache/avro/mapred/TestGenericJob.java
Modified:
avro/trunk/CHANGES.txt
avro/trunk/lang/java/avro/src/main/java/org/apache/avro/reflect/ReflectData.java
Modified: avro/trunk/CHANGES.txt
URL:
http://svn.apache.org/viewvc/avro/trunk/CHANGES.txt?rev=1151660&r1=1151659&r2=1151660&view=diff
==============================================================================
--- avro/trunk/CHANGES.txt (original)
+++ avro/trunk/CHANGES.txt Wed Jul 27 22:52:46 2011
@@ -45,6 +45,9 @@ Avro 1.6.0 (unreleased)
AVRO-824. Java: Fix usage message of BinaryFragmentToJsonTool.
(Jakob Homan via cutting)
+ AVRO-864. Java: Fix reflect to be able to write unions containing
+ generic and/or specific records. (Isabel Drost & cutting)
+
Avro 1.5.2 (unreleased)
NEW FEATURES
Modified:
avro/trunk/lang/java/avro/src/main/java/org/apache/avro/reflect/ReflectData.java
URL:
http://svn.apache.org/viewvc/avro/trunk/lang/java/avro/src/main/java/org/apache/avro/reflect/ReflectData.java?rev=1151660&r1=1151659&r2=1151660&view=diff
==============================================================================
---
avro/trunk/lang/java/avro/src/main/java/org/apache/avro/reflect/ReflectData.java
(original)
+++
avro/trunk/lang/java/avro/src/main/java/org/apache/avro/reflect/ReflectData.java
Wed Jul 27 22:52:46 2011
@@ -41,6 +41,7 @@ import org.apache.avro.Schema;
import org.apache.avro.Protocol.Message;
import org.apache.avro.generic.IndexedRecord;
import org.apache.avro.generic.GenericFixed;
+import org.apache.avro.generic.GenericContainer;
import org.apache.avro.specific.SpecificData;
import org.apache.avro.specific.FixedSize;
import org.apache.avro.io.BinaryData;
@@ -101,6 +102,7 @@ public class ReflectData extends Specifi
@Override
protected boolean isRecord(Object datum) {
if (datum == null) return false;
+ if (super.isRecord(datum)) return true;
return getSchema(datum.getClass()).getType() == Schema.Type.RECORD;
}
@@ -120,6 +122,8 @@ public class ReflectData extends Specifi
@Override
protected Schema getRecordSchema(Object record) {
+ if (record instanceof GenericContainer)
+ return super.getRecordSchema(record);
return getSchema(record.getClass());
}
Added:
avro/trunk/lang/java/mapred/src/test/java/org/apache/avro/mapred/TestGenericJob.java
URL:
http://svn.apache.org/viewvc/avro/trunk/lang/java/mapred/src/test/java/org/apache/avro/mapred/TestGenericJob.java?rev=1151660&view=auto
==============================================================================
---
avro/trunk/lang/java/mapred/src/test/java/org/apache/avro/mapred/TestGenericJob.java
(added)
+++
avro/trunk/lang/java/mapred/src/test/java/org/apache/avro/mapred/TestGenericJob.java
Wed Jul 27 22:52:46 2011
@@ -0,0 +1,156 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.avro.mapred;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.RandomAccessFile;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import org.apache.avro.Schema;
+import org.apache.avro.Schema.Field;
+import org.apache.avro.Schema.Type;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.mapred.AvroJob;
+import org.apache.avro.mapred.AvroOutputFormat;
+import org.apache.avro.mapred.AvroWrapper;
+import org.apache.avro.mapred.Pair;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.FileInputFormat;
+import org.apache.hadoop.mapred.FileOutputFormat;
+
+import org.apache.hadoop.mapred.JobClient;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.MapReduceBase;
+import org.apache.hadoop.mapred.Mapper;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.TextInputFormat;
+import org.codehaus.jackson.node.JsonNodeFactory;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+@SuppressWarnings("deprecation")
+public class TestGenericJob {
+ private static final String dir =
+ System.getProperty("test.dir", ".") + "target/testGenericJob";
+
+ private static Schema createSchema() {
+ List<Field> fields = new ArrayList<Schema.Field>();
+
+
+ fields.add(new Field("Optional", createArraySchema(), "",
+ JsonNodeFactory.instance.arrayNode()));
+
+ Schema recordSchema =
+ Schema.createRecord("Container", "", "org.apache.avro.mapred", false);
+ recordSchema.setFields(fields);
+ return recordSchema;
+ }
+
+ private static Schema createArraySchema() {
+ List<Schema> schemas = new ArrayList<Schema>();
+ for (int i = 0; i < 5; i++) {
+ schemas.add(createInnerSchema("optional_field_" + i));
+ }
+
+ Schema unionSchema = Schema.createUnion(schemas);
+ return Schema.createArray(unionSchema);
+ }
+
+ private static Schema createInnerSchema(String name) {
+ Schema innerrecord = Schema.createRecord(name, "", "", false);
+ innerrecord.setFields
+ (Arrays.asList(new Field(name, Schema.create(Type.LONG), "",
+ JsonNodeFactory.instance.numberNode(0l))));
+ return innerrecord;
+ }
+
+ @Before
+ public void setup() throws IOException {
+ // needed to satisfy the framework only - input ignored in mapper
+ File indir = new File(dir);
+ indir.mkdirs();
+ File infile = new File(dir + "/in");
+ RandomAccessFile file = new RandomAccessFile(infile, "rw");
+ // add some data so framework actually calls our mapper
+ file.writeChars("aa bb cc\ndd ee ff\n");
+ file.close();
+ }
+
+ @After
+ public void tearDown() throws IOException {
+ FileUtil.fullyDelete(new File(dir));
+ }
+
+ static class AvroTestConverter
+ extends MapReduceBase
+ implements Mapper<LongWritable, Text,
+ AvroWrapper<Pair<Long, GenericData.Record>>, NullWritable> {
+
+ public void map(LongWritable key, Text value,
+
OutputCollector<AvroWrapper<Pair<Long,GenericData.Record>>,NullWritable> out,
+ Reporter reporter) throws IOException {
+ GenericData.Record optional_entry =
+ new GenericData.Record(createInnerSchema("optional_field_1"));
+ optional_entry.put("optional_field_1", 0l);
+ GenericData.Array<GenericData.Record> array =
+ new GenericData.Array<GenericData.Record>(1, createArraySchema());
+ array.add(optional_entry);
+
+ GenericData.Record container = new GenericData.Record(createSchema());
+ container.put("Optional", array);
+
+ out.collect(new AvroWrapper<Pair<Long,GenericData.Record>>
+ (new Pair<Long,GenericData.Record>(key.get(), container)),
+ NullWritable.get());
+ }
+ }
+
+
+ @Test
+ public void testJob() throws Exception {
+ JobConf job = new JobConf();
+ Path outputPath = new Path(dir + "/out");
+ outputPath.getFileSystem(job).delete(outputPath);
+
+ job.setInputFormat(TextInputFormat.class);
+ FileInputFormat.setInputPaths(job, dir + "/in");
+
+ job.setMapperClass(AvroTestConverter.class);
+ job.setNumReduceTasks(0);
+
+ FileOutputFormat.setOutputPath(job, outputPath);
+ System.out.println(createSchema());
+ AvroJob.setOutputSchema(job,
+ Pair.getPairSchema(Schema.create(Schema.Type.LONG),
+ createSchema()));
+ job.setOutputFormat(AvroOutputFormat.class);
+
+ JobClient.runJob(job);
+ }
+}
+
+