Author: cutting
Date: Wed Nov 11 23:53:00 2009
New Revision: 835165
URL: http://svn.apache.org/viewvc?rev=835165&view=rev
Log:
AVRO-185. Java's specific API no longer depends on reflection.
Added:
hadoop/avro/trunk/src/java/org/apache/avro/specific/FixedSize.java
- copied, changed from r834652,
hadoop/avro/trunk/src/java/org/apache/avro/reflect/FixedSize.java
Removed:
hadoop/avro/trunk/src/java/org/apache/avro/reflect/FixedSize.java
Modified:
hadoop/avro/trunk/CHANGES.txt
hadoop/avro/trunk/build.xml
hadoop/avro/trunk/src/java/org/apache/avro/generic/GenericRequestor.java
hadoop/avro/trunk/src/java/org/apache/avro/generic/GenericResponder.java
hadoop/avro/trunk/src/java/org/apache/avro/reflect/ReflectData.java
hadoop/avro/trunk/src/java/org/apache/avro/reflect/ReflectDatumReader.java
hadoop/avro/trunk/src/java/org/apache/avro/reflect/ReflectDatumWriter.java
hadoop/avro/trunk/src/java/org/apache/avro/reflect/ReflectRequestor.java
hadoop/avro/trunk/src/java/org/apache/avro/reflect/ReflectResponder.java
hadoop/avro/trunk/src/java/org/apache/avro/specific/SpecificCompiler.java
hadoop/avro/trunk/src/java/org/apache/avro/specific/SpecificData.java
hadoop/avro/trunk/src/java/org/apache/avro/specific/SpecificDatumReader.java
hadoop/avro/trunk/src/java/org/apache/avro/specific/SpecificDatumWriter.java
hadoop/avro/trunk/src/java/org/apache/avro/specific/SpecificFixed.java
hadoop/avro/trunk/src/java/org/apache/avro/specific/SpecificRequestor.java
hadoop/avro/trunk/src/java/org/apache/avro/specific/SpecificResponder.java
hadoop/avro/trunk/src/test/java/org/apache/avro/TestDataFileReflect.java
Modified: hadoop/avro/trunk/CHANGES.txt
URL:
http://svn.apache.org/viewvc/hadoop/avro/trunk/CHANGES.txt?rev=835165&r1=835164&r2=835165&view=diff
==============================================================================
--- hadoop/avro/trunk/CHANGES.txt (original)
+++ hadoop/avro/trunk/CHANGES.txt Wed Nov 11 23:53:00 2009
@@ -4,6 +4,10 @@
INCOMPATIBLE CHANGES
+ AVRO-185. Java's specific API no longer depends on reflection.
+ This reverses the inheritance of most classes in the specific and
+ reflect packages. (cutting)
+
NEW FEATURES
AVRO-151. Validating Avro schema parser for C (massie)
Modified: hadoop/avro/trunk/build.xml
URL:
http://svn.apache.org/viewvc/hadoop/avro/trunk/build.xml?rev=835165&r1=835164&r2=835165&view=diff
==============================================================================
--- hadoop/avro/trunk/build.xml (original)
+++ hadoop/avro/trunk/build.xml Wed Nov 11 23:53:00 2009
@@ -237,6 +237,13 @@
<java-compiler src="${test.java.src.dir}"
dest="${test.java.classes}"
classpath="test.java.classpath"/>
+ <taskdef
+ name="paranamer"
+ classname="com.thoughtworks.paranamer.ant.ParanamerGeneratorTask">
+ <classpath refid="java.classpath" />
+ </taskdef>
+ <paranamer sourceDirectory="${test.java.generated.dir}"
+ outputDirectory="${test.java.generated.classes}"/>
</target>
<macrodef name="java-avro-compiler">
@@ -253,11 +260,6 @@
<taskdef name="schema" classname="org.apache.avro.specific.SchemaTask">
<classpath refid="java.classpath" />
</taskdef>
- <taskdef
- name="paranamer"
- classname="com.thoughtworks.paranamer.ant.ParanamerGeneratorTask">
- <classpath refid="java.classpath" />
- </taskdef>
<mkdir dir="@{generated}"/>
@@ -275,7 +277,6 @@
<java-compiler src="@{generated}" dest="@{dest}"
classpath="@{classpath}"/>
- <paranamer sourceDirectory="@{generated}" outputDirectory="@{dest}"/>
</sequential>
</macrodef>
Modified:
hadoop/avro/trunk/src/java/org/apache/avro/generic/GenericRequestor.java
URL:
http://svn.apache.org/viewvc/hadoop/avro/trunk/src/java/org/apache/avro/generic/GenericRequestor.java?rev=835165&r1=835164&r2=835165&view=diff
==============================================================================
--- hadoop/avro/trunk/src/java/org/apache/avro/generic/GenericRequestor.java
(original)
+++ hadoop/avro/trunk/src/java/org/apache/avro/generic/GenericRequestor.java
Wed Nov 11 23:53:00 2009
@@ -35,15 +35,18 @@
super(protocol, transceiver);
}
+ @Override
public void writeRequest(Schema schema, Object request, Encoder out)
throws IOException {
new GenericDatumWriter<Object>(schema).write(request, out);
}
+ @Override
public Object readResponse(Schema schema, Decoder in) throws IOException {
return new GenericDatumReader<Object>(schema).read(null, in);
}
+ @Override
public AvroRemoteException readError(Schema schema, Decoder in)
throws IOException {
return new AvroRemoteException(new
GenericDatumReader<Object>(schema).read(null,in));
Modified:
hadoop/avro/trunk/src/java/org/apache/avro/generic/GenericResponder.java
URL:
http://svn.apache.org/viewvc/hadoop/avro/trunk/src/java/org/apache/avro/generic/GenericResponder.java?rev=835165&r1=835164&r2=835165&view=diff
==============================================================================
--- hadoop/avro/trunk/src/java/org/apache/avro/generic/GenericResponder.java
(original)
+++ hadoop/avro/trunk/src/java/org/apache/avro/generic/GenericResponder.java
Wed Nov 11 23:53:00 2009
@@ -34,18 +34,18 @@
super(local);
}
- /** Reads a request message. */
+ @Override
public Object readRequest(Schema schema, Decoder in) throws IOException {
return new GenericDatumReader<Object>(schema).read(null, in);
}
- /** Writes a response message. */
+ @Override
public void writeResponse(Schema schema, Object response, Encoder out)
throws IOException {
new GenericDatumWriter<Object>(schema).write(response, out);
}
- /** Writes an error message. */
+ @Override
public void writeError(Schema schema, AvroRemoteException error,
Encoder out) throws IOException {
new GenericDatumWriter<Object>(schema).write(error.getValue(), out);
Modified: hadoop/avro/trunk/src/java/org/apache/avro/reflect/ReflectData.java
URL:
http://svn.apache.org/viewvc/hadoop/avro/trunk/src/java/org/apache/avro/reflect/ReflectData.java?rev=835165&r1=835164&r2=835165&view=diff
==============================================================================
--- hadoop/avro/trunk/src/java/org/apache/avro/reflect/ReflectData.java
(original)
+++ hadoop/avro/trunk/src/java/org/apache/avro/reflect/ReflectData.java Wed Nov
11 23:53:00 2009
@@ -20,34 +20,28 @@
import java.lang.reflect.Field;
import java.lang.reflect.Method;
import java.lang.reflect.Modifier;
-import java.lang.reflect.ParameterizedType;
-import java.nio.ByteBuffer;
+import java.util.List;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.LinkedHashMap;
-import java.util.List;
import java.util.Map;
-import java.util.WeakHashMap;
-import java.util.concurrent.ConcurrentHashMap;
import org.apache.avro.AvroRuntimeException;
-import org.apache.avro.AvroTypeException;
import org.apache.avro.Protocol;
import org.apache.avro.Schema;
-import org.apache.avro.Protocol.Message;
import org.apache.avro.Schema.Type;
-import org.apache.avro.generic.GenericData;
-import org.apache.avro.generic.GenericArray;
+import org.apache.avro.Protocol.Message;
import org.apache.avro.generic.GenericFixed;
+import org.apache.avro.specific.SpecificData;
+import org.apache.avro.specific.FixedSize;
import org.apache.avro.ipc.AvroRemoteException;
-import org.apache.avro.util.Utf8;
import com.thoughtworks.paranamer.CachingParanamer;
import com.thoughtworks.paranamer.Paranamer;
/** Utilities to use existing Java classes and interfaces via reflection. */
-public class ReflectData extends GenericData {
+public class ReflectData extends SpecificData {
/** {...@link ReflectData} implementation that permits null field values.
The
* schema generated for each field is a union of its declared type and
@@ -86,12 +80,6 @@
}
@Override
- protected boolean isEnum(Object datum) {
- return datum instanceof Enum;
- }
-
- /** Returns true if an object matches a schema. */
- @Override
public boolean validate(Schema schema, Object datum) {
switch (schema.getType()) {
case RECORD:
@@ -107,30 +95,8 @@
}
}
return true;
- case ENUM:
- return datum instanceof Enum
- && schema.getEnumSymbols().contains(((Enum)datum).name());
- case ARRAY:
- if (!(datum instanceof GenericArray)) return false;
- for (Object element : (GenericArray)datum)
- if (!validate(schema.getElementType(), element))
- return false;
- return true;
- case UNION:
- for (Schema type : schema.getTypes())
- if (validate(type, datum))
- return true;
- return false;
- case FIXED: return datum instanceof GenericFixed;
- case STRING: return datum instanceof Utf8;
- case BYTES: return datum instanceof ByteBuffer;
- case INT: return datum instanceof Integer;
- case LONG: return datum instanceof Long;
- case FLOAT: return datum instanceof Float;
- case DOUBLE: return datum instanceof Double;
- case BOOLEAN: return datum instanceof Boolean;
- case NULL: return datum == null;
- default: return false;
+ default:
+ return super.validate(schema, datum);
}
}
@@ -144,152 +110,42 @@
}
}
- private Map<String,Class> classCache = new ConcurrentHashMap<String,Class>();
-
- /** Return the class that implements this schema. */
- public Class getClass(Schema schema) {
- switch (schema.getType()) {
- case FIXED:
- case RECORD:
- case ENUM:
- String full = schema.getFullName();
- Class c = classCache.get(full);
- if (c == null) {
- try {
- c = Class.forName(getClassName(schema));
- classCache.put(full, c);
- } catch (ClassNotFoundException e) {
- throw new AvroRuntimeException(e);
- }
- }
- return c;
- case ARRAY: return GenericArray.class;
- case MAP: return Map.class;
- case UNION: return Object.class;
- case STRING: return Utf8.class;
- case BYTES: return ByteBuffer.class;
- case INT: return Integer.TYPE;
- case LONG: return Long.TYPE;
- case FLOAT: return Float.TYPE;
- case DOUBLE: return Double.TYPE;
- case BOOLEAN: return Boolean.TYPE;
- case NULL: return Void.TYPE;
- default: throw new AvroRuntimeException("Unknown type: "+schema);
- }
-
- }
-
- /** Returns the Java class name indicated by a schema's name and namespace.
*/
- public String getClassName(Schema schema) {
- String namespace = schema.getNamespace();
- String name = schema.getName();
- if (namespace == null)
- return name;
- String dot = namespace.endsWith("$") ? "" : ".";
- return namespace + dot + name;
- }
-
- private final WeakHashMap<java.lang.reflect.Type,Schema> schemaCache =
- new WeakHashMap<java.lang.reflect.Type,Schema>();
-
- /** Generate a schema for a Java type.
- * <p>For records, {...@link Class#getDeclaredFields() declared fields} (not
- * inherited) which are not static or transient are used.</p>
- * <p>Note that unions cannot be automatically generated by this method,
- * since Java provides no representation for unions.</p>
- */
- public Schema getSchema(java.lang.reflect.Type type) {
- Schema schema = schemaCache.get(type);
- if (schema == null) {
- schema = createSchema(type, new LinkedHashMap<String,Schema>());
- schemaCache.put(type, schema);
- }
- return schema;
- }
-
- /**
- * Create a schema for a type and it's fields. Note that by design only
fields
- * of the direct class, not it's super classes, are used for creating the
- * schema. Also, fields are not permitted to be null.
- */
+ /** Create a schema for a Java class. Note that by design only fields of the
+ * direct class, not it's super classes, are used for creating a record
+ * schema. Also, fields are not permitted to be null. {...@link
+ * Class#getDeclaredFields() Declared fields} (not inherited) which are not
+ * static or transient are used.*/
+ @Override
@SuppressWarnings(value="unchecked")
- protected Schema createSchema(java.lang.reflect.Type type,
- Map<String,Schema> names) {
- if (type == Utf8.class)
- return Schema.create(Type.STRING);
- else if (type == ByteBuffer.class)
- return Schema.create(Type.BYTES);
- else if ((type == Integer.class) || (type == Integer.TYPE))
- return Schema.create(Type.INT);
- else if ((type == Long.class) || (type == Long.TYPE))
- return Schema.create(Type.LONG);
- else if ((type == Float.class) || (type == Float.TYPE))
- return Schema.create(Type.FLOAT);
- else if ((type == Double.class) || (type == Double.TYPE))
- return Schema.create(Type.DOUBLE);
- else if ((type == Boolean.class) || (type == Boolean.TYPE))
- return Schema.create(Type.BOOLEAN);
- else if ((type == Void.class) || (type == Void.TYPE))
- return Schema.create(Type.NULL);
- else if (type instanceof ParameterizedType) {
- ParameterizedType ptype = (ParameterizedType)type;
- Class raw = (Class)ptype.getRawType();
- java.lang.reflect.Type[] params = ptype.getActualTypeArguments();
- for (int i = 0; i < params.length; i++)
- if (GenericArray.class.isAssignableFrom(raw)) { // array
- if (params.length != 1)
- throw new AvroTypeException("No array type specified.");
- return Schema.createArray(createSchema(params[0], names));
- } else if (Map.class.isAssignableFrom(raw)) { // map
- java.lang.reflect.Type key = params[0];
- java.lang.reflect.Type value = params[1];
- if (!(key == Utf8.class))
- throw new AvroTypeException("Map key class not Utf8: "+key);
- return Schema.createMap(createSchema(value, names));
- }
- } else if (type instanceof Class) {
- Class c = (Class)type;
- String name = c.getSimpleName();
- String space = c.getPackage().getName();
- if (c.getEnclosingClass() != null) // nested class
- space = c.getEnclosingClass().getName() + "$";
- String fullName = c.getName();
- Schema schema = names.get(fullName);
- if (schema == null) {
-
- if (c.isEnum()) { // enum
- List<String> symbols = new ArrayList<String>();
- Enum[] constants = (Enum[])c.getEnumConstants();
- for (int i = 0; i < constants.length; i++)
- symbols.add(constants[i].name());
- schema = Schema.createEnum(name, space, symbols);
- names.put(fullName, schema);
- return schema;
- }
- // fixed
- if (GenericFixed.class.isAssignableFrom(c)) {
- int size = ((FixedSize)c.getAnnotation(FixedSize.class)).value();
- schema = Schema.createFixed(name, space, size);
- names.put(fullName, schema);
- return schema;
+ protected Schema createClassSchema(Class c, Map<String,Schema> names) {
+ String name = c.getSimpleName();
+ String space = c.getPackage().getName();
+ if (c.getEnclosingClass() != null) // nested class
+ space = c.getEnclosingClass().getName() + "$";
+ Schema schema;
+ if (c.isEnum()) { // enum
+ List<String> symbols = new ArrayList<String>();
+ Enum[] constants = (Enum[])c.getEnumConstants();
+ for (int i = 0; i < constants.length; i++)
+ symbols.add(constants[i].name());
+ schema = Schema.createEnum(name, space, symbols);
+ } else if (GenericFixed.class.isAssignableFrom(c)) { // fixed
+ int size = ((FixedSize)c.getAnnotation(FixedSize.class)).value();
+ schema = Schema.createFixed(name, space, size);
+ } else { // record
+ LinkedHashMap<String,Schema.Field> fields =
+ new LinkedHashMap<String,Schema.Field>();
+ schema = Schema.createRecord(name, space,
+ Throwable.class.isAssignableFrom(c));
+ names.put(c.getName(), schema);
+ for (Field field : c.getDeclaredFields())
+ if ((field.getModifiers()&(Modifier.TRANSIENT|Modifier.STATIC))==0) {
+ Schema fieldSchema = createFieldSchema(field, names);
+ fields.put(field.getName(), new Schema.Field(fieldSchema, null));
}
- // record
- LinkedHashMap<String,Schema.Field> fields =
- new LinkedHashMap<String,Schema.Field>();
- schema = Schema.createRecord(name, space,
- Throwable.class.isAssignableFrom(c));
- if (!names.containsKey(fullName))
- names.put(fullName, schema);
- for (Field field : c.getDeclaredFields())
- if ((field.getModifiers()&(Modifier.TRANSIENT|Modifier.STATIC))==0) {
- Schema fieldSchema = createFieldSchema(field, names);
- fields.put(field.getName(), new Schema.Field(fieldSchema, null));
- }
- schema.setFields(fields);
- }
- return schema;
+ schema.setFields(fields);
}
- throw new AvroTypeException("Unknown type: "+type);
+ return schema;
}
/** Create a schema for a field. */
@@ -297,12 +153,12 @@
return createSchema(field.getGenericType(), names);
}
- /** Generate a protocol for a Java interface.
+ /** Return the protocol for a Java interface.
* <p>Note that this requires that <a
* href="http://paranamer.codehaus.org/">Paranamer</a> is run over compiled
* interface declarations, since Java 6 reflection does not provide access to
- * method parameter names. See Avro's build.xml for an example. </p>
- */
+ * method parameter names. See Avro's build.xml for an example. */
+ @Override
public Protocol getProtocol(Class iface) {
Protocol protocol =
new Protocol(iface.getSimpleName(), iface.getPackage().getName());
@@ -346,4 +202,10 @@
return protocol.createMessage(method.getName(), request, response, errors);
}
+ @Override
+ public int compare(Object o1, Object o2, Schema s) {
+ throw new UnsupportedOperationException();
+ }
+
+
}
Modified:
hadoop/avro/trunk/src/java/org/apache/avro/reflect/ReflectDatumReader.java
URL:
http://svn.apache.org/viewvc/hadoop/avro/trunk/src/java/org/apache/avro/reflect/ReflectDatumReader.java?rev=835165&r1=835164&r2=835165&view=diff
==============================================================================
--- hadoop/avro/trunk/src/java/org/apache/avro/reflect/ReflectDatumReader.java
(original)
+++ hadoop/avro/trunk/src/java/org/apache/avro/reflect/ReflectDatumReader.java
Wed Nov 11 23:53:00 2009
@@ -17,19 +17,15 @@
*/
package org.apache.avro.reflect;
-import java.lang.reflect.Constructor;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-
import org.apache.avro.AvroRuntimeException;
import org.apache.avro.Schema;
-import org.apache.avro.generic.GenericDatumReader;
+import org.apache.avro.specific.SpecificDatumReader;
/**
* {...@link org.apache.avro.io.DatumReader DatumReader} for existing classes
via
* Java reflection.
*/
-public class ReflectDatumReader extends GenericDatumReader<Object> {
+public class ReflectDatumReader extends SpecificDatumReader {
public ReflectDatumReader() {}
public ReflectDatumReader(Class c) {
@@ -37,14 +33,10 @@
}
public ReflectDatumReader(Schema root) {
- setSchema(root);
- }
-
- protected Object newRecord(Object old, Schema schema) {
- Class c = ReflectData.get().getClass(schema);
- return (c.isInstance(old) ? old : newInstance(c));
+ super(root);
}
+ @Override
protected void addField(Object record, String name, int position, Object o) {
try {
ReflectData.getField(record.getClass(), name).set(record, o);
@@ -53,6 +45,7 @@
}
}
+ @Override
protected Object getField(Object record, String name, int position) {
try {
return ReflectData.getField(record.getClass(), name).get(record);
@@ -61,41 +54,10 @@
}
}
+ @Override
protected void removeField(Object record, String name, int position) {
addField(record, name, position, null);
}
- @SuppressWarnings("unchecked")
- protected Object createEnum(String symbol, Schema schema) {
- return Enum.valueOf(ReflectData.get().getClass(schema), symbol);
- }
-
- protected Object createFixed(Object old, Schema schema) {
- Class c = ReflectData.get().getClass(schema);
- return c.isInstance(old) ? old : newInstance(c);
- }
-
- private static final Class<?>[] EMPTY_ARRAY = new Class[]{};
- private static final Map<Class,Constructor> CTOR_CACHE =
- new ConcurrentHashMap<Class,Constructor>();
-
- /** Create a new instance of the named class. */
- @SuppressWarnings("unchecked")
- protected static Object newInstance(Class c) {
- Object result;
- try {
- Constructor meth = (Constructor)CTOR_CACHE.get(c);
- if (meth == null) {
- meth = c.getDeclaredConstructor(EMPTY_ARRAY);
- meth.setAccessible(true);
- CTOR_CACHE.put(c, meth);
- }
- result = meth.newInstance();
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
- return result;
- }
-
}
Modified:
hadoop/avro/trunk/src/java/org/apache/avro/reflect/ReflectDatumWriter.java
URL:
http://svn.apache.org/viewvc/hadoop/avro/trunk/src/java/org/apache/avro/reflect/ReflectDatumWriter.java?rev=835165&r1=835164&r2=835165&view=diff
==============================================================================
--- hadoop/avro/trunk/src/java/org/apache/avro/reflect/ReflectDatumWriter.java
(original)
+++ hadoop/avro/trunk/src/java/org/apache/avro/reflect/ReflectDatumWriter.java
Wed Nov 11 23:53:00 2009
@@ -17,18 +17,15 @@
*/
package org.apache.avro.reflect;
-import java.io.IOException;
-
import org.apache.avro.AvroRuntimeException;
import org.apache.avro.Schema;
-import org.apache.avro.generic.GenericDatumWriter;
-import org.apache.avro.io.Encoder;
+import org.apache.avro.specific.SpecificDatumWriter;
/**
* {...@link org.apache.avro.io.DatumWriter DatumWriter} for existing classes
* via Java reflection.
*/
-public class ReflectDatumWriter extends GenericDatumWriter<Object> {
+public class ReflectDatumWriter extends SpecificDatumWriter {
public ReflectDatumWriter() {
this(ReflectData.get());
}
@@ -45,14 +42,15 @@
this(root, ReflectData.get());
}
- public ReflectDatumWriter(Schema root, ReflectData reflectData) {
+ protected ReflectDatumWriter(Schema root, ReflectData reflectData) {
super(root, reflectData);
}
- public ReflectDatumWriter(ReflectData reflectData) {
+ protected ReflectDatumWriter(ReflectData reflectData) {
super(reflectData);
}
+ @Override
protected Object getField(Object record, String name, int position) {
try {
return ReflectData.getField(record.getClass(), name).get(record);
@@ -61,14 +59,5 @@
}
}
- protected void writeEnum(Schema schema, Object datum, Encoder out)
- throws IOException {
- out.writeEnum(((Enum)datum).ordinal());
- }
-
- protected boolean isEnum(Object datum) {
- return datum instanceof Enum;
- }
-
}
Modified:
hadoop/avro/trunk/src/java/org/apache/avro/reflect/ReflectRequestor.java
URL:
http://svn.apache.org/viewvc/hadoop/avro/trunk/src/java/org/apache/avro/reflect/ReflectRequestor.java?rev=835165&r1=835164&r2=835165&view=diff
==============================================================================
--- hadoop/avro/trunk/src/java/org/apache/avro/reflect/ReflectRequestor.java
(original)
+++ hadoop/avro/trunk/src/java/org/apache/avro/reflect/ReflectRequestor.java
Wed Nov 11 23:53:00 2009
@@ -19,74 +19,38 @@
package org.apache.avro.reflect;
import java.io.IOException;
-import java.lang.reflect.InvocationHandler;
-import java.lang.reflect.Method;
import java.lang.reflect.Proxy;
-import java.util.Map;
import org.apache.avro.Protocol;
import org.apache.avro.Schema;
import org.apache.avro.io.DatumReader;
import org.apache.avro.io.DatumWriter;
-import org.apache.avro.io.Decoder;
-import org.apache.avro.io.Encoder;
-import org.apache.avro.ipc.AvroRemoteException;
-import org.apache.avro.ipc.Requestor;
+import org.apache.avro.specific.SpecificRequestor;
import org.apache.avro.ipc.Transceiver;
-/** A {...@link Requestor} for existing interfaces via Java reflection. */
-public class ReflectRequestor extends Requestor implements InvocationHandler {
+/** A {...@link org.apache.avro.ipc.Requestor} for existing interfaces. */
+public class ReflectRequestor extends SpecificRequestor {
public ReflectRequestor(Class<?> iface, Transceiver transceiver)
throws IOException {
- this(iface, transceiver, ReflectData.get());
+ this(ReflectData.get().getProtocol(iface), transceiver);
}
protected ReflectRequestor(Protocol protocol, Transceiver transceiver)
throws IOException {
- this(protocol, transceiver, ReflectData.get());
- }
-
- public ReflectRequestor(Class<?> iface, Transceiver transceiver, ReflectData
reflectData)
- throws IOException {
- this(reflectData.getProtocol(iface), transceiver, reflectData);
- }
-
- protected ReflectRequestor(Protocol protocol, Transceiver transceiver,
ReflectData reflectData)
- throws IOException {
super(protocol, transceiver);
}
-
- public Object invoke(Object proxy, Method method, Object[] args)
- throws Throwable {
- return request(method.getName(), args);
- }
+ @Override
protected DatumWriter<Object> getDatumWriter(Schema schema) {
return new ReflectDatumWriter(schema);
}
+ @Override
protected DatumReader<Object> getDatumReader(Schema schema) {
return new ReflectDatumReader(schema);
}
- public void writeRequest(Schema schema, Object request, Encoder out)
- throws IOException {
- Object[] args = (Object[])request;
- int i = 0;
- for (Map.Entry<String, Schema> param : schema.getFieldSchemas())
- getDatumWriter(param.getValue()).write(args[i++], out);
- }
-
- public Object readResponse(Schema schema, Decoder in) throws IOException {
- return getDatumReader(schema).read(null, in);
- }
-
- public AvroRemoteException readError(Schema schema, Decoder in)
- throws IOException {
- return (AvroRemoteException)getDatumReader(schema).read(null, in);
- }
-
/** Create a proxy instance whose methods invoke RPCs. */
public static Object getClient(Class<?> iface, Transceiver transciever)
throws IOException {
Modified:
hadoop/avro/trunk/src/java/org/apache/avro/reflect/ReflectResponder.java
URL:
http://svn.apache.org/viewvc/hadoop/avro/trunk/src/java/org/apache/avro/reflect/ReflectResponder.java?rev=835165&r1=835164&r2=835165&view=diff
==============================================================================
--- hadoop/avro/trunk/src/java/org/apache/avro/reflect/ReflectResponder.java
(original)
+++ hadoop/avro/trunk/src/java/org/apache/avro/reflect/ReflectResponder.java
Wed Nov 11 23:53:00 2009
@@ -18,83 +18,31 @@
package org.apache.avro.reflect;
-import java.io.IOException;
-import java.lang.reflect.InvocationTargetException;
-import java.lang.reflect.Method;
-import java.util.Map;
-
-import org.apache.avro.AvroRuntimeException;
import org.apache.avro.Schema;
-import org.apache.avro.Protocol.Message;
+import org.apache.avro.Protocol;
import org.apache.avro.io.DatumReader;
import org.apache.avro.io.DatumWriter;
-import org.apache.avro.io.Decoder;
-import org.apache.avro.io.Encoder;
-import org.apache.avro.ipc.AvroRemoteException;
-import org.apache.avro.ipc.Responder;
-
-/** {...@link Responder} for existing interfaces via Java reflection.*/
-public class ReflectResponder extends Responder {
- private Object impl;
+import org.apache.avro.specific.SpecificResponder;
+/** {...@link org.apache.avro.ipc.Responder} for existing interfaces.*/
+public class ReflectResponder extends SpecificResponder {
public ReflectResponder(Class iface, Object impl) {
- this(iface, impl, ReflectData.get());
+ this(ReflectData.get().getProtocol(iface), impl);
}
- public ReflectResponder(Class iface, Object impl, ReflectData reflectData) {
- super(reflectData.getProtocol(iface));
- this.impl = impl;
+ public ReflectResponder(Protocol protocol, Object impl) {
+ super(protocol, impl);
}
+ @Override
protected DatumWriter<Object> getDatumWriter(Schema schema) {
return new ReflectDatumWriter(schema);
}
+ @Override
protected DatumReader<Object> getDatumReader(Schema schema) {
return new ReflectDatumReader(schema);
}
- /** Reads a request message. */
- public Object readRequest(Schema schema, Decoder in) throws IOException {
- Object[] args = new Object[schema.getFields().size()];
- int i = 0;
- for (Map.Entry<String, Schema> param : schema.getFieldSchemas())
- args[i++] = getDatumReader(param.getValue()).read(null, in);
- return args;
- }
-
- /** Writes a response message. */
- public void writeResponse(Schema schema, Object response, Encoder out)
- throws IOException {
- getDatumWriter(schema).write(response, out);
- }
-
- /** Writes an error message. */
- public void writeError(Schema schema, AvroRemoteException error,
- Encoder out) throws IOException {
- getDatumWriter(schema).write(error, out);
- }
-
- public Object respond(Message message, Object request)
- throws AvroRemoteException {
- Class[] paramTypes = new Class[message.getRequest().getFields().size()];
- int i = 0;
- try {
- for (Map.Entry<String,Schema> param:
message.getRequest().getFieldSchemas())
- paramTypes[i++] = ReflectData.get().getClass(param.getValue());
- Method method = impl.getClass().getMethod(message.getName(), paramTypes);
- return method.invoke(impl, (Object[])request);
- } catch (InvocationTargetException e) {
- Throwable target = e.getTargetException();
- if (target instanceof AvroRemoteException)
- throw (AvroRemoteException)target;
- else throw new AvroRuntimeException(e);
- } catch (NoSuchMethodException e) {
- throw new AvroRuntimeException(e);
- } catch (IllegalAccessException e) {
- throw new AvroRuntimeException(e);
- }
- }
-
}
Copied: hadoop/avro/trunk/src/java/org/apache/avro/specific/FixedSize.java
(from r834652,
hadoop/avro/trunk/src/java/org/apache/avro/reflect/FixedSize.java)
URL:
http://svn.apache.org/viewvc/hadoop/avro/trunk/src/java/org/apache/avro/specific/FixedSize.java?p2=hadoop/avro/trunk/src/java/org/apache/avro/specific/FixedSize.java&p1=hadoop/avro/trunk/src/java/org/apache/avro/reflect/FixedSize.java&r1=834652&r2=835165&rev=835165&view=diff
==============================================================================
--- hadoop/avro/trunk/src/java/org/apache/avro/reflect/FixedSize.java (original)
+++ hadoop/avro/trunk/src/java/org/apache/avro/specific/FixedSize.java Wed Nov
11 23:53:00 2009
@@ -15,7 +15,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.avro.reflect;
+package org.apache.avro.specific;
import java.lang.annotation.Documented;
import java.lang.annotation.ElementType;
Modified:
hadoop/avro/trunk/src/java/org/apache/avro/specific/SpecificCompiler.java
URL:
http://svn.apache.org/viewvc/hadoop/avro/trunk/src/java/org/apache/avro/specific/SpecificCompiler.java?rev=835165&r1=835164&r2=835165&view=diff
==============================================================================
--- hadoop/avro/trunk/src/java/org/apache/avro/specific/SpecificCompiler.java
(original)
+++ hadoop/avro/trunk/src/java/org/apache/avro/specific/SpecificCompiler.java
Wed Nov 11 23:53:00 2009
@@ -151,8 +151,8 @@
StringBuilder out = new StringBuilder();
header(out, protocol.getNamespace());
line(out, 0, "public interface "+protocol.getName()+" {");
-
- out.append("\n");
+ line(out, 1, "public static final Protocol _PROTOCOL = Protocol.parse(\""
+ +esc(protocol)+"\");");
for (Map.Entry<String,Message> e : protocol.getMessages().entrySet()) {
String name = e.getKey();
Message message = e.getValue();
@@ -193,7 +193,7 @@
line(out, 0, "import org.apache.avro.specific.SpecificRecordBase;");
line(out, 0, "import org.apache.avro.specific.SpecificRecord;");
line(out, 0, "import org.apache.avro.specific.SpecificFixed;");
- line(out, 0, "import org.apache.avro.reflect.FixedSize;");
+ line(out, 0, "import org.apache.avro.specific.FixedSize;");
for (Schema s : queue)
if (namespace == null
? (s.getNamespace() != null)
Modified: hadoop/avro/trunk/src/java/org/apache/avro/specific/SpecificData.java
URL:
http://svn.apache.org/viewvc/hadoop/avro/trunk/src/java/org/apache/avro/specific/SpecificData.java?rev=835165&r1=835164&r2=835165&view=diff
==============================================================================
--- hadoop/avro/trunk/src/java/org/apache/avro/specific/SpecificData.java
(original)
+++ hadoop/avro/trunk/src/java/org/apache/avro/specific/SpecificData.java Wed
Nov 11 23:53:00 2009
@@ -19,14 +19,24 @@
import java.util.Iterator;
import java.util.Map;
+import java.util.WeakHashMap;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.LinkedHashMap;
+import java.nio.ByteBuffer;
+import java.lang.reflect.ParameterizedType;
import org.apache.avro.Schema;
+import org.apache.avro.Protocol;
import org.apache.avro.AvroRuntimeException;
+import org.apache.avro.AvroTypeException;
import org.apache.avro.Schema.Field;
-import org.apache.avro.reflect.ReflectData;
+import org.apache.avro.Schema.Type;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericArray;
+import org.apache.avro.util.Utf8;
/** Utilities for generated Java classes and interfaces. */
-public class SpecificData extends ReflectData {
+public class SpecificData extends GenericData {
private static final SpecificData INSTANCE = new SpecificData();
@@ -36,36 +46,165 @@
public static SpecificData get() { return INSTANCE; }
@Override
- protected Schema createSchema(java.lang.reflect.Type type,
- Map<String,Schema> names) {
- if (type instanceof Class) {
- Class c = (Class)type;
- String name = c.getSimpleName();
- Schema schema = names.get(name);
- if (schema != null) return schema;
- if (SpecificRecord.class.isAssignableFrom(c)) {
+ protected boolean isRecord(Object datum) {
+ return datum instanceof SpecificRecord;
+ }
+
+ @Override
+ protected Schema getRecordSchema(Object record) {
+ return ((SpecificRecord)record).getSchema();
+ }
+
+ @Override
+ protected boolean isEnum(Object datum) {
+ return datum instanceof Enum;
+ }
+
+ @Override
+ public boolean validate(Schema schema, Object datum) {
+ switch (schema.getType()) {
+ case RECORD:
+ Class c = datum.getClass();
+ if (!(datum instanceof SpecificRecord)) return false;
+ SpecificRecord record = (SpecificRecord)datum;
+ Iterator<Field> fields = schema.getFields().values().iterator();
+ for (int i = 0; fields.hasNext(); i++)
+ if (!validate(fields.next().schema(), record.get(i)))
+ return false;
+ return true;
+ case ENUM:
+ return datum instanceof Enum
+ && schema.getEnumSymbols().contains(((Enum)datum).name());
+ default:
+ return super.validate(schema, datum);
+ }
+ }
+
+ private Map<String,Class> classCache = new ConcurrentHashMap<String,Class>();
+
+ /** Return the class that implements a schema. */
+ public Class getClass(Schema schema) {
+ switch (schema.getType()) {
+ case FIXED:
+ case RECORD:
+ case ENUM:
+ String name = schema.getFullName();
+ Class c = classCache.get(name);
+ if (c == null) {
try {
- schema = (Schema)((Class)type).getDeclaredField("_SCHEMA").get(null);
- } catch (NoSuchFieldException e) {
- throw new AvroRuntimeException(e);
- } catch (IllegalAccessException e) {
+ c = Class.forName(getClassName(schema));
+ classCache.put(name, c);
+ } catch (ClassNotFoundException e) {
throw new AvroRuntimeException(e);
}
- names.put(name, schema);
- return schema;
}
+ return c;
+ case ARRAY: return GenericArray.class;
+ case MAP: return Map.class;
+ case UNION: return Object.class;
+ case STRING: return Utf8.class;
+ case BYTES: return ByteBuffer.class;
+ case INT: return Integer.TYPE;
+ case LONG: return Long.TYPE;
+ case FLOAT: return Float.TYPE;
+ case DOUBLE: return Double.TYPE;
+ case BOOLEAN: return Boolean.TYPE;
+ case NULL: return Void.TYPE;
+ default: throw new AvroRuntimeException("Unknown type: "+schema);
}
- return super.createSchema(type, names);
}
- @Override
- protected boolean isRecord(Object datum) {
- return datum instanceof SpecificRecord;
+ /** Returns the Java class name indicated by a schema's name and namespace.
*/
+ public String getClassName(Schema schema) {
+ String namespace = schema.getNamespace();
+ String name = schema.getName();
+ if (namespace == null)
+ return name;
+ String dot = namespace.endsWith("$") ? "" : ".";
+ return namespace + dot + name;
}
- @Override
- protected Schema getRecordSchema(Object record) {
- return ((SpecificRecord)record).getSchema();
+ private final WeakHashMap<java.lang.reflect.Type,Schema> schemaCache =
+ new WeakHashMap<java.lang.reflect.Type,Schema>();
+
+ /** Find the schema for a Java type. */
+ public Schema getSchema(java.lang.reflect.Type type) {
+ Schema schema = schemaCache.get(type);
+ if (schema == null) {
+ schema = createSchema(type, new LinkedHashMap<String,Schema>());
+ schemaCache.put(type, schema);
+ }
+ return schema;
+ }
+
+ /** Create the schema for a Java type. */
+ @SuppressWarnings(value="unchecked")
+ protected Schema createSchema(java.lang.reflect.Type type,
+ Map<String,Schema> names) {
+ if (type == Utf8.class)
+ return Schema.create(Type.STRING);
+ else if (type == ByteBuffer.class)
+ return Schema.create(Type.BYTES);
+ else if ((type == Integer.class) || (type == Integer.TYPE))
+ return Schema.create(Type.INT);
+ else if ((type == Long.class) || (type == Long.TYPE))
+ return Schema.create(Type.LONG);
+ else if ((type == Float.class) || (type == Float.TYPE))
+ return Schema.create(Type.FLOAT);
+ else if ((type == Double.class) || (type == Double.TYPE))
+ return Schema.create(Type.DOUBLE);
+ else if ((type == Boolean.class) || (type == Boolean.TYPE))
+ return Schema.create(Type.BOOLEAN);
+ else if ((type == Void.class) || (type == Void.TYPE))
+ return Schema.create(Type.NULL);
+ else if (type instanceof ParameterizedType) {
+ ParameterizedType ptype = (ParameterizedType)type;
+ Class raw = (Class)ptype.getRawType();
+ java.lang.reflect.Type[] params = ptype.getActualTypeArguments();
+ for (int i = 0; i < params.length; i++)
+ if (GenericArray.class.isAssignableFrom(raw)) { // array
+ if (params.length != 1)
+ throw new AvroTypeException("No array type specified.");
+ return Schema.createArray(createSchema(params[0], names));
+ } else if (Map.class.isAssignableFrom(raw)) { // map
+ java.lang.reflect.Type key = params[0];
+ java.lang.reflect.Type value = params[1];
+ if (!(key == Utf8.class))
+ throw new AvroTypeException("Map key class not Utf8: "+key);
+ return Schema.createMap(createSchema(value, names));
+ }
+ } else if (type instanceof Class) { // class
+ Class c = (Class)type;
+ String fullName = c.getName();
+ Schema schema = names.get(fullName);
+ if (schema == null)
+ schema = createClassSchema(c, names);
+ names.put(fullName, schema);
+ return schema;
+ }
+ throw new AvroTypeException("Unknown type: "+type);
+ }
+
+ /** Create a schema for a Java class. */
+ protected Schema createClassSchema(Class c, Map<String,Schema> names) {
+ try {
+ return (Schema)(c.getDeclaredField("_SCHEMA").get(null));
+ } catch (NoSuchFieldException e) {
+ throw new AvroRuntimeException(e);
+ } catch (IllegalAccessException e) {
+ throw new AvroRuntimeException(e);
+ }
+ }
+
+ /** Return the protocol for a Java interface. */
+ public Protocol getProtocol(Class iface) {
+ try {
+ return (Protocol)(iface.getDeclaredField("_PROTOCOL").get(null));
+ } catch (NoSuchFieldException e) {
+ throw new AvroRuntimeException(e);
+ } catch (IllegalAccessException e) {
+ throw new AvroRuntimeException(e);
+ }
}
@Override
Modified:
hadoop/avro/trunk/src/java/org/apache/avro/specific/SpecificDatumReader.java
URL:
http://svn.apache.org/viewvc/hadoop/avro/trunk/src/java/org/apache/avro/specific/SpecificDatumReader.java?rev=835165&r1=835164&r2=835165&view=diff
==============================================================================
---
hadoop/avro/trunk/src/java/org/apache/avro/specific/SpecificDatumReader.java
(original)
+++
hadoop/avro/trunk/src/java/org/apache/avro/specific/SpecificDatumReader.java
Wed Nov 11 23:53:00 2009
@@ -17,11 +17,15 @@
*/
package org.apache.avro.specific;
+import java.lang.reflect.Constructor;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
import org.apache.avro.Schema;
-import org.apache.avro.reflect.ReflectDatumReader;
+import org.apache.avro.generic.GenericDatumReader;
/** {...@link org.apache.avro.io.DatumReader DatumReader} for generated Java
classes. */
-public class SpecificDatumReader extends ReflectDatumReader {
+public class SpecificDatumReader extends GenericDatumReader<Object> {
public SpecificDatumReader() {}
public SpecificDatumReader(Class c) {
@@ -32,15 +36,57 @@
super(schema);
}
+ @Override
+ protected Object newRecord(Object old, Schema schema) {
+ Class c = SpecificData.get().getClass(schema);
+ return (c.isInstance(old) ? old : newInstance(c));
+ }
+
+ @Override
protected void addField(Object record, String name, int position, Object o) {
((SpecificRecord)record).set(position, o);
}
+ @Override
protected Object getField(Object record, String name, int position) {
return ((SpecificRecord)record).get(position);
}
+ @Override
protected void removeField(Object record, String field, int position) {
((SpecificRecord)record).set(position, null);
}
+ @Override
+ @SuppressWarnings("unchecked")
+ protected Object createEnum(String symbol, Schema schema) {
+ return Enum.valueOf(SpecificData.get().getClass(schema), symbol);
+ }
+
+ @Override
+ protected Object createFixed(Object old, Schema schema) {
+ Class c = SpecificData.get().getClass(schema);
+ return c.isInstance(old) ? old : newInstance(c);
+ }
+
+ private static final Class<?>[] EMPTY_ARRAY = new Class[]{};
+ private static final Map<Class,Constructor> CTOR_CACHE =
+ new ConcurrentHashMap<Class,Constructor>();
+
+ @SuppressWarnings("unchecked")
+ private static Object newInstance(Class c) {
+ Object result;
+ try {
+ Constructor meth = (Constructor)CTOR_CACHE.get(c);
+ if (meth == null) {
+ meth = c.getDeclaredConstructor(EMPTY_ARRAY);
+ meth.setAccessible(true);
+ CTOR_CACHE.put(c, meth);
+ }
+ result = meth.newInstance();
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ return result;
+ }
+
}
Modified:
hadoop/avro/trunk/src/java/org/apache/avro/specific/SpecificDatumWriter.java
URL:
http://svn.apache.org/viewvc/hadoop/avro/trunk/src/java/org/apache/avro/specific/SpecificDatumWriter.java?rev=835165&r1=835164&r2=835165&view=diff
==============================================================================
---
hadoop/avro/trunk/src/java/org/apache/avro/specific/SpecificDatumWriter.java
(original)
+++
hadoop/avro/trunk/src/java/org/apache/avro/specific/SpecificDatumWriter.java
Wed Nov 11 23:53:00 2009
@@ -17,11 +17,14 @@
*/
package org.apache.avro.specific;
+import java.io.IOException;
+
import org.apache.avro.Schema;
-import org.apache.avro.reflect.ReflectDatumWriter;
+import org.apache.avro.generic.GenericDatumWriter;
+import org.apache.avro.io.Encoder;
/** {...@link org.apache.avro.io.DatumWriter DatumWriter} for generated Java
classes. */
-public class SpecificDatumWriter extends ReflectDatumWriter {
+public class SpecificDatumWriter extends GenericDatumWriter<Object> {
public SpecificDatumWriter() {}
public SpecificDatumWriter(Class c) {
@@ -32,9 +35,24 @@
super(schema, SpecificData.get());
}
+ protected SpecificDatumWriter(Schema root, SpecificData specificData) {
+ super(root, specificData);
+ }
+
+ protected SpecificDatumWriter(SpecificData specificData) {
+ super(specificData);
+ }
+
+ @Override
protected Object getField(Object record, String name, int position) {
return ((SpecificRecord)record).get(position);
}
+ @Override
+ protected void writeEnum(Schema schema, Object datum, Encoder out)
+ throws IOException {
+ out.writeEnum(((Enum)datum).ordinal());
+ }
+
}
Modified: hadoop/avro/trunk/src/java/org/apache/avro/specific/SpecificFixed.java
URL:
http://svn.apache.org/viewvc/hadoop/avro/trunk/src/java/org/apache/avro/specific/SpecificFixed.java?rev=835165&r1=835164&r2=835165&view=diff
==============================================================================
--- hadoop/avro/trunk/src/java/org/apache/avro/specific/SpecificFixed.java
(original)
+++ hadoop/avro/trunk/src/java/org/apache/avro/specific/SpecificFixed.java Wed
Nov 11 23:53:00 2009
@@ -18,7 +18,6 @@
package org.apache.avro.specific;
import org.apache.avro.generic.GenericData;
-import org.apache.avro.reflect.FixedSize;
/** Base class for generated fixed-sized data classes. */
public abstract class SpecificFixed extends GenericData.Fixed {
Modified:
hadoop/avro/trunk/src/java/org/apache/avro/specific/SpecificRequestor.java
URL:
http://svn.apache.org/viewvc/hadoop/avro/trunk/src/java/org/apache/avro/specific/SpecificRequestor.java?rev=835165&r1=835164&r2=835165&view=diff
==============================================================================
--- hadoop/avro/trunk/src/java/org/apache/avro/specific/SpecificRequestor.java
(original)
+++ hadoop/avro/trunk/src/java/org/apache/avro/specific/SpecificRequestor.java
Wed Nov 11 23:53:00 2009
@@ -19,34 +19,40 @@
package org.apache.avro.specific;
import java.io.IOException;
+import java.lang.reflect.Method;
import java.lang.reflect.Proxy;
+import java.lang.reflect.InvocationHandler;
+import java.util.Map;
import org.apache.avro.Protocol;
import org.apache.avro.Schema;
import org.apache.avro.io.DatumReader;
import org.apache.avro.io.DatumWriter;
+import org.apache.avro.io.Decoder;
+import org.apache.avro.io.Encoder;
+import org.apache.avro.ipc.AvroRemoteException;
import org.apache.avro.ipc.Transceiver;
-import org.apache.avro.reflect.ReflectRequestor;
+import org.apache.avro.ipc.Requestor;
/** {...@link org.apache.avro.ipc.Requestor Requestor} for generated
interfaces. */
-public class SpecificRequestor extends ReflectRequestor {
+public class SpecificRequestor extends Requestor implements InvocationHandler {
public SpecificRequestor(Class<?> iface, Transceiver transceiver)
throws IOException {
- this(iface, transceiver, SpecificData.get());
+ this(SpecificData.get().getProtocol(iface), transceiver);
}
- public SpecificRequestor(Class<?> iface, Transceiver transceiver,
- SpecificData specificData)
- throws IOException {
- this(specificData.getProtocol(iface), transceiver);
- }
-
- private SpecificRequestor(Protocol protocol, Transceiver transceiver)
+ protected SpecificRequestor(Protocol protocol, Transceiver transceiver)
throws IOException {
super(protocol, transceiver);
}
-
+
+ @Override
+ public Object invoke(Object proxy, Method method, Object[] args)
+ throws Throwable {
+ return request(method.getName(), args);
+ }
+
protected DatumWriter<Object> getDatumWriter(Schema schema) {
return new SpecificDatumWriter(schema);
}
@@ -55,6 +61,26 @@
return new SpecificDatumReader(schema);
}
+ @Override
+ public void writeRequest(Schema schema, Object request, Encoder out)
+ throws IOException {
+ Object[] args = (Object[])request;
+ int i = 0;
+ for (Map.Entry<String, Schema> param : schema.getFieldSchemas())
+ getDatumWriter(param.getValue()).write(args[i++], out);
+ }
+
+ @Override
+ public Object readResponse(Schema schema, Decoder in) throws IOException {
+ return getDatumReader(schema).read(null, in);
+ }
+
+ @Override
+ public AvroRemoteException readError(Schema schema, Decoder in)
+ throws IOException {
+ return (AvroRemoteException)getDatumReader(schema).read(null, in);
+ }
+
/** Create a proxy instance whose methods invoke RPCs. */
public static Object getClient(Class<?> iface, Transceiver transciever)
throws IOException {
Modified:
hadoop/avro/trunk/src/java/org/apache/avro/specific/SpecificResponder.java
URL:
http://svn.apache.org/viewvc/hadoop/avro/trunk/src/java/org/apache/avro/specific/SpecificResponder.java?rev=835165&r1=835164&r2=835165&view=diff
==============================================================================
--- hadoop/avro/trunk/src/java/org/apache/avro/specific/SpecificResponder.java
(original)
+++ hadoop/avro/trunk/src/java/org/apache/avro/specific/SpecificResponder.java
Wed Nov 11 23:53:00 2009
@@ -18,17 +18,35 @@
package org.apache.avro.specific;
+import java.io.IOException;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.util.Map;
+
import org.apache.avro.Schema;
+import org.apache.avro.Protocol;
+import org.apache.avro.Protocol.Message;
+import org.apache.avro.AvroRuntimeException;
import org.apache.avro.io.DatumReader;
import org.apache.avro.io.DatumWriter;
-import org.apache.avro.reflect.ReflectResponder;
+import org.apache.avro.io.Decoder;
+import org.apache.avro.io.Encoder;
+import org.apache.avro.ipc.AvroRemoteException;
+import org.apache.avro.ipc.Responder;
/** {...@link org.apache.avro.ipc.Responder Responder} for generated
interfaces.*/
-public class SpecificResponder extends ReflectResponder {
+public class SpecificResponder extends Responder {
+ private Object impl;
+
public SpecificResponder(Class iface, Object impl) {
- super(iface, impl);
+ this(SpecificData.get().getProtocol(iface), impl);
}
+ public SpecificResponder(Protocol protocol, Object impl) {
+ super(protocol);
+ this.impl = impl;
+ }
+
protected DatumWriter<Object> getDatumWriter(Schema schema) {
return new SpecificDatumWriter(schema);
}
@@ -37,5 +55,48 @@
return new SpecificDatumReader(schema);
}
+ @Override
+ public Object readRequest(Schema schema, Decoder in) throws IOException {
+ Object[] args = new Object[schema.getFields().size()];
+ int i = 0;
+ for (Map.Entry<String, Schema> param : schema.getFieldSchemas())
+ args[i++] = getDatumReader(param.getValue()).read(null, in);
+ return args;
+ }
+
+ @Override
+ public void writeResponse(Schema schema, Object response, Encoder out)
+ throws IOException {
+ getDatumWriter(schema).write(response, out);
+ }
+
+ @Override
+ public void writeError(Schema schema, AvroRemoteException error,
+ Encoder out) throws IOException {
+ getDatumWriter(schema).write(error, out);
+ }
+
+ @Override
+ public Object respond(Message message, Object request)
+ throws AvroRemoteException {
+ Class[] paramTypes = new Class[message.getRequest().getFields().size()];
+ int i = 0;
+ try {
+ for (Map.Entry<String,Schema> param:
message.getRequest().getFieldSchemas())
+ paramTypes[i++] = SpecificData.get().getClass(param.getValue());
+ Method method = impl.getClass().getMethod(message.getName(), paramTypes);
+ return method.invoke(impl, (Object[])request);
+ } catch (InvocationTargetException e) {
+ Throwable target = e.getTargetException();
+ if (target instanceof AvroRemoteException)
+ throw (AvroRemoteException)target;
+ else throw new AvroRuntimeException(e);
+ } catch (NoSuchMethodException e) {
+ throw new AvroRuntimeException(e);
+ } catch (IllegalAccessException e) {
+ throw new AvroRuntimeException(e);
+ }
+ }
+
}
Modified:
hadoop/avro/trunk/src/test/java/org/apache/avro/TestDataFileReflect.java
URL:
http://svn.apache.org/viewvc/hadoop/avro/trunk/src/test/java/org/apache/avro/TestDataFileReflect.java?rev=835165&r1=835164&r2=835165&view=diff
==============================================================================
--- hadoop/avro/trunk/src/test/java/org/apache/avro/TestDataFileReflect.java
(original)
+++ hadoop/avro/trunk/src/test/java/org/apache/avro/TestDataFileReflect.java
Wed Nov 11 23:53:00 2009
@@ -132,7 +132,7 @@
ReflectData reflectData = ReflectData.AllowNull.get();
Schema schema = reflectData.getSchema(BarRecord.class);
DataFileWriter<Object> writer = new DataFileWriter<Object>(schema, fos,
- new ReflectDatumWriter(schema, reflectData));
+ new ReflectDatumWriter(BarRecord.class, reflectData));
// test writing to a file
CheckList check = new CheckList();