Author: cutting
Date: Mon May 11 18:19:46 2009
New Revision: 773638
URL: http://svn.apache.org/viewvc?rev=773638&view=rev
Log:
Add Java support for default values.
Modified:
hadoop/avro/trunk/CHANGES.txt
hadoop/avro/trunk/src/doc/content/xdocs/spec.xml
hadoop/avro/trunk/src/java/org/apache/avro/Protocol.java
hadoop/avro/trunk/src/java/org/apache/avro/Schema.java
hadoop/avro/trunk/src/java/org/apache/avro/generic/GenericData.java
hadoop/avro/trunk/src/java/org/apache/avro/generic/GenericDatumReader.java
hadoop/avro/trunk/src/java/org/apache/avro/reflect/ReflectData.java
hadoop/avro/trunk/src/test/java/org/apache/avro/TestSchema.java
Modified: hadoop/avro/trunk/CHANGES.txt
URL:
http://svn.apache.org/viewvc/hadoop/avro/trunk/CHANGES.txt?rev=773638&r1=773637&r2=773638&view=diff
==============================================================================
--- hadoop/avro/trunk/CHANGES.txt (original)
+++ hadoop/avro/trunk/CHANGES.txt Mon May 11 18:19:46 2009
@@ -15,6 +15,8 @@
representations, especially records with integer-indexed fields.
(Hong Tang via cutting)
+ AVRO-8. Add Java support for default values. (cutting)
+
IMPROVEMENTS
AVRO-11. Re-implement specific and reflect datum readers and
Modified: hadoop/avro/trunk/src/doc/content/xdocs/spec.xml
URL:
http://svn.apache.org/viewvc/hadoop/avro/trunk/src/doc/content/xdocs/spec.xml?rev=773638&r1=773637&r2=773638&view=diff
==============================================================================
--- hadoop/avro/trunk/src/doc/content/xdocs/spec.xml (original)
+++ hadoop/avro/trunk/src/doc/content/xdocs/spec.xml Mon May 11 18:19:46 2009
@@ -100,9 +100,29 @@
<ul>
<li><code>name</code>: a JSON string providing the name
of the field (required), and </li>
- <li><code>type</code>A JSON object defining a schema, or
+ <li><code>type:</code> A JSON object defining a schema, or
a JSON string naming a record definition
(required).</li>
+ <li><code>default:</code> A default value for this
+ field, used when reading instances that lack this
+ field (optional). Permitted values depend on the
+ field's schema type, according to the table below.
+ Default values for union fields correspond the first
+ schema in the union.
+ <table class="right">
+ <caption>field default values</caption>
+ <tr><th>avro type</th><th>json
type</th><th>example</th></tr>
+ <tr><td>string</td><td>string</td><td>"foo"</td></tr>
+ <tr><td>bytes</td><td> base64
string</td><td>"c3VyZS4="</td></tr>
+ <tr><td>int,long</td><td>integer</td><td>1</td></tr>
+ <tr><td>float,double</td><td>number</td><td>1.1</td></tr>
+ <tr><td>boolean</td><td>boolean</td><td>true</td></tr>
+ <tr><td>null</td><td>null</td><td>null</td></tr>
+ <tr><td>record</td><td>object</td><td>{"a": 1}</td></tr>
+ <tr><td>array</td><td>array</td><td>[1]</td></tr>
+ <tr><td>map</td><td>object</td><td>{"a": 1}</td></tr>
+ </table>
+ </li>
</ul>
</li>
</ul>
Modified: hadoop/avro/trunk/src/java/org/apache/avro/Protocol.java
URL:
http://svn.apache.org/viewvc/hadoop/avro/trunk/src/java/org/apache/avro/Protocol.java?rev=773638&r1=773637&r2=773638&view=diff
==============================================================================
--- hadoop/avro/trunk/src/java/org/apache/avro/Protocol.java (original)
+++ hadoop/avro/trunk/src/java/org/apache/avro/Protocol.java Mon May 11
18:19:46 2009
@@ -32,6 +32,8 @@
import org.codehaus.jackson.JsonParser;
import org.codehaus.jackson.map.JsonNode;
+import org.apache.avro.Schema.Field;
+
/** A set of messages forming an application protocol.
* <p> A protocol consists of:
* <ul>
@@ -266,7 +268,7 @@
JsonNode requestNode = json.getFieldValue("request");
if (requestNode == null || !requestNode.isArray())
throw new SchemaParseException("No request specified: "+json);
- Map<String,Schema> fields = new LinkedHashMap<String,Schema>();
+ LinkedHashMap<String,Field> fields = new LinkedHashMap<String,Field>();
for (JsonNode field : requestNode) {
JsonNode fieldNameNode = field.getFieldValue("name");
if (fieldNameNode == null)
@@ -274,7 +276,9 @@
JsonNode fieldTypeNode = field.getFieldValue("type");
if (fieldTypeNode == null)
throw new SchemaParseException("No param type: "+field);
-
fields.put(fieldNameNode.getTextValue(),Schema.parse(fieldTypeNode,types));
+ fields.put(fieldNameNode.getTextValue(),
+ new Field(Schema.parse(fieldTypeNode,types),
+ field.getFieldValue("default")));
}
Schema request = Schema.createRecord(fields);
Modified: hadoop/avro/trunk/src/java/org/apache/avro/Schema.java
URL:
http://svn.apache.org/viewvc/hadoop/avro/trunk/src/java/org/apache/avro/Schema.java?rev=773638&r1=773637&r2=773638&view=diff
==============================================================================
--- hadoop/avro/trunk/src/java/org/apache/avro/Schema.java (original)
+++ hadoop/avro/trunk/src/java/org/apache/avro/Schema.java Mon May 11 18:19:46
2009
@@ -84,7 +84,7 @@
}
/** Create an anonymous record schema. */
- public static Schema createRecord(Map<String,Schema> fields) {
+ public static Schema createRecord(LinkedHashMap<String,Field> fields) {
Schema result = createRecord(null, null, false);
result.setFields(fields);
return result;
@@ -125,7 +125,7 @@
}
/** If this is a record, set its fields. */
- public void setFields(Map<String,Schema> fields) {
+ public void setFields(LinkedHashMap<String,Field> fields) {
throw new AvroRuntimeException("Not a record: "+this);
}
@@ -173,20 +173,26 @@
/** A field within a record. */
public static class Field {
- int position;
- Schema schema;
- Field(int pos, Schema schema) {
- this.position = pos;
+ private int position = -1;
+ private final Schema schema;
+ private final JsonNode defaultValue;
+ public Field(Schema schema, JsonNode defaultValue) {
this.schema = schema;
+ this.defaultValue = defaultValue;
}
/** The position of this field within the record. */
public int pos() { return position; }
/** This field's {...@link Schema}. */
public Schema schema() { return schema; }
+ public JsonNode defaultValue() { return defaultValue; }
public boolean equals(Object other) {
if (!(other instanceof Field)) return false;
Field that = (Field) other;
- return (position == that.position) && (schema.equals(that.schema));
+ return (position == that.position) &&
+ (schema.equals(that.schema)) &&
+ (defaultValue == null
+ ? that.defaultValue == null
+ : (defaultValue.equals(that.defaultValue)));
}
}
@@ -209,15 +215,20 @@
public Iterable<Map.Entry<String, Schema>> getFieldSchemas() {
return fieldSchemas;
}
- public void setFields(Map<String,Schema> fields) {
+ public void setFields(LinkedHashMap<String,Field> fields) {
if (this.fields != null)
throw new AvroRuntimeException("Fields are already set");
- this.fields = new LinkedHashMap<String, Field>();
int i = 0;
- this.fieldSchemas = fields.entrySet();
- for (Map.Entry<String, Schema> field : this.fieldSchemas) {
- this.fields.put(field.getKey(), new Field(i++, field.getValue()));
+ LinkedHashMap<String,Schema> schemas = new
LinkedHashMap<String,Schema>();
+ for (Map.Entry<String, Field> pair : fields.entrySet()) {
+ Field f = pair.getValue();
+ if (f.position != -1)
+ throw new AvroRuntimeException("Field already used: "+f);
+ f.position = i++;
+ schemas.put(pair.getKey(), f.schema());
}
+ this.fields = fields;
+ this.fieldSchemas = schemas.entrySet();
}
public boolean equals(Object o) {
if (o == this) return true;
@@ -233,11 +244,15 @@
+(name==null?"":"\"name\": \""+name+"\", ")
+"\"fields\": [");
int count = 0;
- for (Map.Entry<String, Schema> entry : fieldSchemas) {
+ for (Map.Entry<String, Field> entry : fields.entrySet()) {
buffer.append("{\"name\": \"");
buffer.append(entry.getKey());
buffer.append("\", \"type\": ");
- buffer.append(entry.getValue().toString(names));
+ buffer.append(entry.getValue().schema().toString(names));
+ if (entry.getValue().defaultValue() != null) {
+ buffer.append("\", \"default\": ");
+ buffer.append(entry.getValue().defaultValue());
+ }
buffer.append("}");
if (++count < fields.size())
buffer.append(", ");
@@ -446,7 +461,7 @@
throw new SchemaParseException("No type: "+schema);
String type = typeNode.getTextValue();
if (type.equals("record") || type.equals("error")) { // record
- Map<String,Schema> fields = new LinkedHashMap<String,Schema>();
+ LinkedHashMap<String,Field> fields = new LinkedHashMap<String,Field>();
JsonNode nameNode = schema.getFieldValue("name");
String name = nameNode != null ? nameNode.getTextValue() : null;
JsonNode spaceNode = schema.getFieldValue("namespace");
@@ -464,7 +479,9 @@
JsonNode fieldTypeNode = field.getFieldValue("type");
if (fieldTypeNode == null)
throw new SchemaParseException("No field type: "+field);
- fields.put(fieldNameNode.getTextValue(), parse(fieldTypeNode,
names));
+ Schema fieldSchema = parse(fieldTypeNode, names);
+ fields.put(fieldNameNode.getTextValue(),
+ new Field(fieldSchema, field.getFieldValue("default")));
}
result.setFields(fields);
return result;
Modified: hadoop/avro/trunk/src/java/org/apache/avro/generic/GenericData.java
URL:
http://svn.apache.org/viewvc/hadoop/avro/trunk/src/java/org/apache/avro/generic/GenericData.java?rev=773638&r1=773637&r2=773638&view=diff
==============================================================================
--- hadoop/avro/trunk/src/java/org/apache/avro/generic/GenericData.java
(original)
+++ hadoop/avro/trunk/src/java/org/apache/avro/generic/GenericData.java Mon May
11 18:19:46 2009
@@ -25,6 +25,7 @@
import org.apache.avro.AvroTypeException;
import org.apache.avro.Schema;
+import org.apache.avro.Schema.Field;
import org.apache.avro.Schema.Type;
import org.apache.avro.util.Utf8;
@@ -200,9 +201,9 @@
if (datum instanceof GenericRecord) {
@SuppressWarnings(value="unchecked")
GenericRecord record = (GenericRecord)datum;
- Map<String,Schema> fields = new LinkedHashMap<String,Schema>();
+ LinkedHashMap<String,Field> fields = new LinkedHashMap<String,Field>();
for (Map.Entry<String,Object> entry : record.entrySet())
- fields.put(entry.getKey(), induce(entry.getValue()));
+ fields.put(entry.getKey(), new Field(induce(entry.getValue()), null));
return Schema.createRecord(fields);
} else if (datum instanceof GenericArray) {
Schema elementType = null;
Modified:
hadoop/avro/trunk/src/java/org/apache/avro/generic/GenericDatumReader.java
URL:
http://svn.apache.org/viewvc/hadoop/avro/trunk/src/java/org/apache/avro/generic/GenericDatumReader.java?rev=773638&r1=773637&r2=773638&view=diff
==============================================================================
--- hadoop/avro/trunk/src/java/org/apache/avro/generic/GenericDatumReader.java
(original)
+++ hadoop/avro/trunk/src/java/org/apache/avro/generic/GenericDatumReader.java
Mon May 11 18:19:46 2009
@@ -19,8 +19,12 @@
import java.io.IOException;
import java.util.HashMap;
+import java.util.Iterator;
import java.util.Map;
import java.util.Set;
+import java.nio.ByteBuffer;
+
+import org.codehaus.jackson.map.JsonNode;
import org.apache.avro.AvroRuntimeException;
import org.apache.avro.AvroTypeException;
@@ -29,6 +33,7 @@
import org.apache.avro.Schema.Type;
import org.apache.avro.io.DatumReader;
import org.apache.avro.io.ValueReader;
+import org.apache.avro.util.Utf8;
/** {...@link DatumReader} for generic Java objects. */
public class GenericDatumReader<D> implements DatumReader<D> {
@@ -140,13 +145,19 @@
read(oldDatum,actualField.schema(),expectedField.schema(), in));
size++;
}
- if (expectedFields.size() > size) {
- // clear old fields (in expected, but not in actual)
+ if (expectedFields.size() > size) { // not all fields set
Set<String> actualFields = actual.getFields().keySet();
for (Map.Entry<String, Field> entry : expectedFields.entrySet()) {
- String f = entry.getKey();
- if (!actualFields.contains(f))
- removeField(record, f, entry.getValue().pos());
+ String fieldName = entry.getKey();
+ if (!actualFields.contains(fieldName)) { // an unset field
+ Field f = entry.getValue();
+ JsonNode json = f.defaultValue();
+ if (json != null) // has default
+ addField(record, fieldName, f.pos(), // add default
+ defaultFieldValue(old, f.schema(), json));
+ else if (old != null) // remove stale value
+ removeField(record, fieldName, entry.getValue().pos());
+ }
}
}
return record;
@@ -173,6 +184,53 @@
((GenericRecord) record).remove(field);
}
+ /** Called by the default implementation of {...@link #readRecord} to
construct
+ a default value for a field. */
+ protected Object defaultFieldValue(Object old, Schema schema, JsonNode json)
{
+ switch (schema.getType()) {
+ case RECORD:
+ Object record = newRecord(old, schema);
+ for (Map.Entry<String, Field> entry : schema.getFields().entrySet()) {
+ String name = entry.getKey();
+ Field f = entry.getValue();
+ JsonNode v = json.getFieldValue(name);
+ if (v == null) v = f.defaultValue();
+ if (v != null) {
+ Object o = old != null ? getField(old, name, f.pos()) : null;
+ addField(record, name, f.pos(), defaultFieldValue(o, f.schema(), v));
+ } else if (old != null) {
+ removeField(record, name, f.pos());
+ }
+ }
+ return record;
+ case ARRAY:
+ Object array = newArray(old, json.size());
+ Schema element = schema.getElementType();
+ for (JsonNode node : json)
+ addToArray(array, defaultFieldValue(peekArray(array), element, node));
+ return array;
+ case MAP:
+ Object map = newMap(old, json.size());
+ Schema value = schema.getValueType();
+ for (Iterator<String> i = json.getFieldNames(); i.hasNext();) {
+ String key = i.next();
+ addToMap(map, new Utf8(key),
+ defaultFieldValue(null, value, json.getFieldValue(key)));
+ }
+ return map;
+ case UNION: return defaultFieldValue(old, schema.getTypes().get(0),
json);
+ case STRING: return createString(json.getTextValue());
+ case BYTES: return createBytes(json.getTextValue().getBytes());
+ case INT: return json.getIntValue();
+ case LONG: return json.getLongValue();
+ case FLOAT: return (float)json.getDoubleValue();
+ case DOUBLE: return json.getDoubleValue();
+ case BOOLEAN: return json.getBooleanValue();
+ case NULL: return null;
+ default: throw new AvroRuntimeException("Unknown type: "+actual);
+ }
+ }
+
/** Called to read an array instance. May be overridden for alternate array
* representations.*/
@SuppressWarnings(value="unchecked")
@@ -271,6 +329,11 @@
return in.readUtf8(old);
}
+ /** Called to create a string from a default value. Subclasses may override
+ * to use a different string representation. By default, this calls
{...@link
+ * Utf8#Utf8(String)}.*/
+ protected Object createString(String value) { return new Utf8(value); }
+
/** Called to read byte arrays. Subclasses may override to use a different
* byte array representation. By default, this calls {...@link
* ValueReader#readBuffer(Object)}.*/
@@ -278,6 +341,11 @@
return in.readBuffer(old);
}
+ /** Called to create byte arrays from default values. Subclasses may
+ * override to use a different byte array representation. By default, this
+ * calls {...@link ByteBuffer#wrap(byte[])}.*/
+ protected Object createBytes(byte[] value) { return ByteBuffer.wrap(value); }
+
private static final Schema STRING_SCHEMA = Schema.create(Type.STRING);
/** Skip an instance of a schema. */
Modified: hadoop/avro/trunk/src/java/org/apache/avro/reflect/ReflectData.java
URL:
http://svn.apache.org/viewvc/hadoop/avro/trunk/src/java/org/apache/avro/reflect/ReflectData.java?rev=773638&r1=773637&r2=773638&view=diff
==============================================================================
--- hadoop/avro/trunk/src/java/org/apache/avro/reflect/ReflectData.java
(original)
+++ hadoop/avro/trunk/src/java/org/apache/avro/reflect/ReflectData.java Mon May
11 18:19:46 2009
@@ -147,15 +147,17 @@
String name = c.getSimpleName(); // FIXME: ignoring package
Schema schema = names.get(name);
if (schema == null) {
- Map<String,Schema> fields = new LinkedHashMap<String,Schema>();
+ LinkedHashMap<String,Schema.Field> fields =
+ new LinkedHashMap<String,Schema.Field>();
schema = Schema.createRecord(name, c.getPackage().getName(),
Throwable.class.isAssignableFrom(c));
if (!names.containsKey(name))
names.put(name, schema);
for (Field field : c.getDeclaredFields())
- if ((field.getModifiers()&(Modifier.TRANSIENT|Modifier.STATIC))==0)
- fields.put(field.getName(),
- createSchema(field.getGenericType(), names));
+ if ((field.getModifiers()&(Modifier.TRANSIENT|Modifier.STATIC))==0) {
+ Schema fieldSchema = createSchema(field.getGenericType(), names);
+ fields.put(field.getName(), new Schema.Field(fieldSchema, null));
+ }
schema.setFields(fields);
}
return schema;
@@ -183,11 +185,13 @@
private static Message getMessage(Method method, Protocol protocol) {
Map<String,Schema> names = protocol.getTypes();
- Map<String,Schema> fields = new LinkedHashMap<String,Schema>();
+ LinkedHashMap<String,Schema.Field> fields =
+ new LinkedHashMap<String,Schema.Field>();
String[] paramNames = PARANAMER.lookupParameterNames(method);
java.lang.reflect.Type[] paramTypes = method.getGenericParameterTypes();
for (int i = 0; i < paramTypes.length; i++)
- fields.put(paramNames[i], createSchema(paramTypes[i], names));
+ fields.put(paramNames[i],
+ new Schema.Field(createSchema(paramTypes[i], names), null));
Schema request = Schema.createRecord(fields);
Schema response = createSchema(method.getGenericReturnType(), names);
Modified: hadoop/avro/trunk/src/test/java/org/apache/avro/TestSchema.java
URL:
http://svn.apache.org/viewvc/hadoop/avro/trunk/src/test/java/org/apache/avro/TestSchema.java?rev=773638&r1=773637&r2=773638&view=diff
==============================================================================
--- hadoop/avro/trunk/src/test/java/org/apache/avro/TestSchema.java (original)
+++ hadoop/avro/trunk/src/test/java/org/apache/avro/TestSchema.java Mon May 11
18:19:46 2009
@@ -18,11 +18,15 @@
package org.apache.avro;
import java.io.*;
+import java.util.*;
+import java.nio.ByteBuffer;
import org.codehaus.jackson.map.JsonNode;
import junit.framework.TestCase;
import org.apache.avro.io.*;
+import org.apache.avro.util.Utf8;
import org.apache.avro.generic.*;
+import org.apache.avro.Schema.*;
public class TestSchema extends TestCase {
@@ -30,48 +34,56 @@
Integer.parseInt(System.getProperty("test.count", "10"));
public void testNull() throws Exception {
- check("\"null\"");
+ check("\"null\"", "null", null);
}
public void testBoolean() throws Exception {
- check("\"boolean\"");
+ check("\"boolean\"", "true", Boolean.TRUE);
}
public void testString() throws Exception {
- check("\"string\"");
+ check("\"string\"", "\"foo\"", new Utf8("foo"));
}
public void testBytes() throws Exception {
- check("\"bytes\"");
+ check("\"bytes\"", "\"\"", ByteBuffer.allocate(0));
}
public void testInt() throws Exception {
- check("\"int\"");
+ check("\"int\"", "9", new Integer(9));
}
public void testLong() throws Exception {
- check("\"long\"");
+ check("\"long\"", "11", new Long(11));
}
public void testFloat() throws Exception {
- check("\"float\"");
+ check("\"float\"", "1.1", new Float(1.1));
}
public void testDouble() throws Exception {
- check("\"double\"");
+ check("\"double\"", "1.2", new Double(1.2));
}
public void testArray() throws Exception {
- check("{\"type\":\"array\", \"items\": \"long\"}");
+ GenericArray<Long> array = new GenericData.Array<Long>(1);
+ array.add(1L);
+ check("{\"type\":\"array\", \"items\": \"long\"}", "[1]", array);
}
public void testMap() throws Exception {
- check("{\"type\":\"map\", \"values\": \"string\"}");
+ HashMap<Utf8,Long> map = new HashMap<Utf8,Long>();
+ map.put(new Utf8("a"), 1L);
+ check("{\"type\":\"map\", \"values\":\"long\"}", "{\"a\":1}", map);
}
public void testRecord() throws Exception {
- check("{\"type\":\"record\",\"fields\":["
- +"{\"name\":\"f\", \"type\":\"string\"}]}");
+ String recordJson =
+ "{\"type\":\"record\",\"fields\":[{\"name\":\"f\", \"type\":\"long\"}]}";
+ Schema schema = Schema.parse(recordJson);
+ GenericData.Record record = new GenericData.Record(schema);
+ record.put("f", 11L);
+ check(recordJson, "{\"f\":11}", record);
}
public void testRecursive() throws Exception {
@@ -93,6 +105,13 @@
public void testUnion() throws Exception {
check("[\"string\", \"long\"]", false);
+ checkDefault("[\"double\", \"long\"]", "1.1", new Double(1.1));
+ }
+
+ private static void check(String schemaJson, String defaultJson,
+ Object defaultValue) throws Exception {
+ check(schemaJson, true);
+ checkDefault(schemaJson, defaultJson, defaultValue);
}
private static void check(String jsonSchema) throws Exception {
@@ -139,4 +158,22 @@
assertEquals("Decoded data does not match.", datum, decoded);
}
+ private static final Schema ACTUAL = // an empty record schema
+ Schema.createRecord(new LinkedHashMap<String,Field>());
+
+ @SuppressWarnings(value="unchecked")
+ private static void checkDefault(String schemaJson, String defaultJson,
+ Object defaultValue) throws Exception {
+ String recordJson = "{\"type\":\"record\",\"fields\":[{\"name\":\"f\", "
+ +"\"type\":"+schemaJson+", "
+ +"\"default\":"+defaultJson+"}]}";
+ Schema expected = Schema.parse(recordJson);
+ DatumReader in = new GenericDatumReader(ACTUAL, expected);
+ GenericData.Record record = (GenericData.Record)
+ in.read(null, new ValueReader(new ByteArrayInputStream(new byte[0])));
+ assertEquals("Wrong default.", defaultValue, record.get("f"));
+ }
+
+
+
}