Author: cutting
Date: Mon Feb 21 17:20:23 2011
New Revision: 1073077
URL: http://svn.apache.org/viewvc?rev=1073077&view=rev
Log:
AVRO-755. Java: Fix SpecificResponder to correctly handle message parameter
lists that differ between client and server.
Modified:
avro/trunk/CHANGES.txt
avro/trunk/lang/java/avro/src/main/java/org/apache/avro/specific/SpecificData.java
avro/trunk/lang/java/ipc/src/main/java/org/apache/avro/ipc/Responder.java
avro/trunk/lang/java/ipc/src/main/java/org/apache/avro/ipc/generic/GenericResponder.java
avro/trunk/lang/java/ipc/src/main/java/org/apache/avro/ipc/reflect/ReflectResponder.java
avro/trunk/lang/java/ipc/src/main/java/org/apache/avro/ipc/specific/SpecificResponder.java
avro/trunk/lang/java/ipc/src/test/java/org/apache/avro/TestProtocolSpecific.java
Modified: avro/trunk/CHANGES.txt
URL:
http://svn.apache.org/viewvc/avro/trunk/CHANGES.txt?rev=1073077&r1=1073076&r2=1073077&view=diff
==============================================================================
--- avro/trunk/CHANGES.txt (original)
+++ avro/trunk/CHANGES.txt Mon Feb 21 17:20:23 2011
@@ -223,6 +223,9 @@ Avro 1.5.0 (unreleased)
AVRO-759. Java: Fix NullPointerException when some but not all
fields are aliased. (Xiaolu Ye via cutting)
+ AVRO-755. Java: Fix SpecificResponder to correctly handle message
+ parameter lists that differ between client and server. (cutting)
+
Avro 1.4.1 (13 October 2010)
NEW FEATURES
Modified:
avro/trunk/lang/java/avro/src/main/java/org/apache/avro/specific/SpecificData.java
URL:
http://svn.apache.org/viewvc/avro/trunk/lang/java/avro/src/main/java/org/apache/avro/specific/SpecificData.java?rev=1073077&r1=1073076&r2=1073077&view=diff
==============================================================================
---
avro/trunk/lang/java/avro/src/main/java/org/apache/avro/specific/SpecificData.java
(original)
+++
avro/trunk/lang/java/avro/src/main/java/org/apache/avro/specific/SpecificData.java
Mon Feb 21 17:20:23 2011
@@ -60,6 +60,7 @@ public class SpecificData extends Generi
case RECORD:
case ENUM:
String name = schema.getFullName();
+ if (name == null) return null;
Class c = classCache.get(name);
if (c == null) {
try {
Modified:
avro/trunk/lang/java/ipc/src/main/java/org/apache/avro/ipc/Responder.java
URL:
http://svn.apache.org/viewvc/avro/trunk/lang/java/ipc/src/main/java/org/apache/avro/ipc/Responder.java?rev=1073077&r1=1073076&r2=1073077&view=diff
==============================================================================
--- avro/trunk/lang/java/ipc/src/main/java/org/apache/avro/ipc/Responder.java
(original)
+++ avro/trunk/lang/java/ipc/src/main/java/org/apache/avro/ipc/Responder.java
Mon Feb 21 17:20:23 2011
@@ -125,8 +125,12 @@ public abstract class Responder {
Message rm = remote.getMessages().get(messageName);
if (rm == null)
throw new AvroRuntimeException("No such remote message: "+messageName);
-
- Object request = readRequest(rm.getRequest(), in);
+ Message m = getLocal().getMessages().get(messageName);
+ if (m == null)
+ throw new AvroRuntimeException("No message named "+messageName
+ +" in "+getLocal());
+
+ Object request = readRequest(rm.getRequest(), m.getRequest(), in);
context.setMessage(rm);
for (RPCPlugin plugin : rpcMetaPlugins) {
@@ -134,10 +138,6 @@ public abstract class Responder {
}
// create response using local protocol specification
- Message m = getLocal().getMessages().get(messageName);
- if (m == null)
- throw new AvroRuntimeException("No message named "+messageName
- +" in "+getLocal());
if ((m.isOneWay() != rm.isOneWay()) && wasConnected)
throw new AvroRuntimeException("Not both one-way: "+messageName);
@@ -241,7 +241,7 @@ public abstract class Responder {
throws Exception;
/** Reads a request message. */
- public abstract Object readRequest(Schema schema, Decoder in)
+ public abstract Object readRequest(Schema actual, Schema expected, Decoder
in)
throws IOException;
/** Writes a response message. */
Modified:
avro/trunk/lang/java/ipc/src/main/java/org/apache/avro/ipc/generic/GenericResponder.java
URL:
http://svn.apache.org/viewvc/avro/trunk/lang/java/ipc/src/main/java/org/apache/avro/ipc/generic/GenericResponder.java?rev=1073077&r1=1073076&r2=1073077&view=diff
==============================================================================
---
avro/trunk/lang/java/ipc/src/main/java/org/apache/avro/ipc/generic/GenericResponder.java
(original)
+++
avro/trunk/lang/java/ipc/src/main/java/org/apache/avro/ipc/generic/GenericResponder.java
Mon Feb 21 17:20:23 2011
@@ -27,6 +27,8 @@ import org.apache.avro.generic.GenericDa
import org.apache.avro.generic.GenericDatumWriter;
import org.apache.avro.io.Decoder;
import org.apache.avro.io.Encoder;
+import org.apache.avro.io.DatumReader;
+import org.apache.avro.io.DatumWriter;
import org.apache.avro.ipc.Responder;
/** {@link Responder} implementation for generic Java data. */
@@ -36,15 +38,24 @@ public abstract class GenericResponder e
super(local);
}
+ protected DatumWriter<Object> getDatumWriter(Schema schema) {
+ return new GenericDatumWriter<Object>(schema);
+ }
+
+ protected DatumReader<Object> getDatumReader(Schema actual, Schema expected)
{
+ return new GenericDatumReader<Object>(actual, expected);
+ }
+
@Override
- public Object readRequest(Schema schema, Decoder in) throws IOException {
- return new GenericDatumReader<Object>(schema).read(null, in);
+ public Object readRequest(Schema actual, Schema expected, Decoder in)
+ throws IOException {
+ return getDatumReader(actual, expected).read(null, in);
}
@Override
public void writeResponse(Schema schema, Object response, Encoder out)
throws IOException {
- new GenericDatumWriter<Object>(schema).write(response, out);
+ getDatumWriter(schema).write(response, out);
}
@Override
@@ -52,7 +63,7 @@ public abstract class GenericResponder e
Encoder out) throws IOException {
if (error instanceof AvroRemoteException)
error = ((AvroRemoteException)error).getValue();
- new GenericDatumWriter<Object>(schema).write(error, out);
+ getDatumWriter(schema).write(error, out);
}
}
Modified:
avro/trunk/lang/java/ipc/src/main/java/org/apache/avro/ipc/reflect/ReflectResponder.java
URL:
http://svn.apache.org/viewvc/avro/trunk/lang/java/ipc/src/main/java/org/apache/avro/ipc/reflect/ReflectResponder.java?rev=1073077&r1=1073076&r2=1073077&view=diff
==============================================================================
---
avro/trunk/lang/java/ipc/src/main/java/org/apache/avro/ipc/reflect/ReflectResponder.java
(original)
+++
avro/trunk/lang/java/ipc/src/main/java/org/apache/avro/ipc/reflect/ReflectResponder.java
Mon Feb 21 17:20:23 2011
@@ -46,8 +46,8 @@ public class ReflectResponder extends Sp
}
@Override
- protected DatumReader<Object> getDatumReader(Schema schema) {
- return new ReflectDatumReader<Object>(schema);
+ protected DatumReader<Object> getDatumReader(Schema actual, Schema expected)
{
+ return new ReflectDatumReader<Object>(actual, expected);
}
@Override
Modified:
avro/trunk/lang/java/ipc/src/main/java/org/apache/avro/ipc/specific/SpecificResponder.java
URL:
http://svn.apache.org/viewvc/avro/trunk/lang/java/ipc/src/main/java/org/apache/avro/ipc/specific/SpecificResponder.java?rev=1073077&r1=1073076&r2=1073077&view=diff
==============================================================================
---
avro/trunk/lang/java/ipc/src/main/java/org/apache/avro/ipc/specific/SpecificResponder.java
(original)
+++
avro/trunk/lang/java/ipc/src/main/java/org/apache/avro/ipc/specific/SpecificResponder.java
Mon Feb 21 17:20:23 2011
@@ -28,15 +28,15 @@ import org.apache.avro.Protocol.Message;
import org.apache.avro.AvroRuntimeException;
import org.apache.avro.io.DatumReader;
import org.apache.avro.io.DatumWriter;
-import org.apache.avro.io.Decoder;
import org.apache.avro.io.Encoder;
-import org.apache.avro.ipc.Responder;
+import org.apache.avro.generic.GenericRecord;
import org.apache.avro.specific.SpecificData;
import org.apache.avro.specific.SpecificDatumReader;
import org.apache.avro.specific.SpecificDatumWriter;
+import org.apache.avro.ipc.generic.GenericResponder;
/** {@link org.apache.avro.ipc.Responder Responder} for generated interfaces.*/
-public class SpecificResponder extends Responder {
+public class SpecificResponder extends GenericResponder {
private Object impl;
private SpecificData data;
@@ -55,27 +55,14 @@ public class SpecificResponder extends R
this.data = data;
}
+ @Override
protected DatumWriter<Object> getDatumWriter(Schema schema) {
return new SpecificDatumWriter<Object>(schema);
}
- protected DatumReader<Object> getDatumReader(Schema schema) {
- return new SpecificDatumReader<Object>(schema);
- }
-
- @Override
- public Object readRequest(Schema schema, Decoder in) throws IOException {
- Object[] args = new Object[schema.getFields().size()];
- int i = 0;
- for (Schema.Field param : schema.getFields())
- args[i++] = getDatumReader(param.schema()).read(null, in);
- return args;
- }
-
@Override
- public void writeResponse(Schema schema, Object response, Encoder out)
- throws IOException {
- getDatumWriter(schema).write(response, out);
+ protected DatumReader<Object> getDatumReader(Schema actual, Schema expected)
{
+ return new SpecificDatumReader<Object>(actual, expected);
}
@Override
@@ -86,14 +73,19 @@ public class SpecificResponder extends R
@Override
public Object respond(Message message, Object request) throws Exception {
- Class[] paramTypes = new Class[message.getRequest().getFields().size()];
+ int numParams = message.getRequest().getFields().size();
+ Object[] params = new Object[numParams];
+ Class[] paramTypes = new Class[numParams];
int i = 0;
try {
- for (Schema.Field param: message.getRequest().getFields())
- paramTypes[i++] = data.getClass(param.schema());
+ for (Schema.Field param: message.getRequest().getFields()) {
+ params[i] = ((GenericRecord)request).get(param.name());
+ paramTypes[i] = data.getClass(param.schema());
+ i++;
+ }
Method method = impl.getClass().getMethod(message.getName(), paramTypes);
method.setAccessible(true);
- return method.invoke(impl, (Object[])request);
+ return method.invoke(impl, params);
} catch (InvocationTargetException e) {
throw (Exception)e.getTargetException();
} catch (NoSuchMethodException e) {
Modified:
avro/trunk/lang/java/ipc/src/test/java/org/apache/avro/TestProtocolSpecific.java
URL:
http://svn.apache.org/viewvc/avro/trunk/lang/java/ipc/src/test/java/org/apache/avro/TestProtocolSpecific.java?rev=1073077&r1=1073076&r2=1073077&view=diff
==============================================================================
---
avro/trunk/lang/java/ipc/src/test/java/org/apache/avro/TestProtocolSpecific.java
(original)
+++
avro/trunk/lang/java/ipc/src/test/java/org/apache/avro/TestProtocolSpecific.java
Mon Feb 21 17:20:23 2011
@@ -27,6 +27,9 @@ import org.apache.avro.ipc.SocketTransce
import org.apache.avro.ipc.Transceiver;
import org.apache.avro.ipc.specific.SpecificRequestor;
import org.apache.avro.ipc.specific.SpecificResponder;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.ipc.generic.GenericRequestor;
import org.apache.avro.test.Simple;
import org.apache.avro.test.Kind;
import org.apache.avro.test.MD5;
@@ -49,6 +52,8 @@ import java.io.LineNumberReader;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.util.Random;
+import java.util.List;
+import java.util.ArrayList;
public class TestProtocolSpecific {
@@ -210,9 +215,41 @@ public class TestProtocolSpecific {
proxy.hello("hi!");
}
}
-
- @After
- public void testHandshakeOccursOnce() throws IOException{
+
+ @Test
+ /** Construct and use a protocol whose "hello" method has an extra
+ argument to check that schema is sent to parse request. */
+ public void testParamVariation() throws Exception {
+ Protocol protocol = new Protocol("Simple", "org.apache.avro.test");
+ List<Schema.Field> fields = new ArrayList<Schema.Field>();
+ fields.add(new Schema.Field("extra", Schema.create(Schema.Type.BOOLEAN),
+ null, null));
+ fields.add(new Schema.Field("greeting", Schema.create(Schema.Type.STRING),
+ null, null));
+ Protocol.Message message =
+ protocol.createMessage("hello",
+ null /* doc */,
+ Schema.createRecord(fields),
+ Schema.create(Schema.Type.STRING),
+ Schema.createUnion(new ArrayList<Schema>()));
+ protocol.getMessages().put("hello", message);
+ Transceiver t = createTransceiver();
+ try {
+ GenericRequestor r = new GenericRequestor(protocol, t);
+ addRpcPlugins(r);
+ GenericRecord params = new GenericData.Record(message.getRequest());
+ params.put("extra", Boolean.TRUE);
+ params.put("greeting", new Utf8("bob"));
+ Utf8 response = (Utf8)r.request("hello", params);
+ assertEquals(new Utf8("goodbye"), response);
+ } finally {
+ t.close();
+ server.close();
+ }
+ }
+
+ @AfterClass
+ public static void testHandshakeCount() throws IOException {
monitor.assertHandshake();
}
@@ -245,7 +282,7 @@ public class TestProtocolSpecific {
}
protected int getExpectedHandshakeCount() {
- return 1;
+ return 3;
}
public static class InteropTest {