Author: cutting
Date: Fri Jun 25 23:40:37 2010
New Revision: 958149
URL: http://svn.apache.org/viewvc?rev=958149&view=rev
Log:
AVRO-578. Java: add payload data to RPC context for use by plugins.
Contributed by Patrick Wendell.
Modified:
avro/trunk/CHANGES.txt
avro/trunk/lang/java/src/java/org/apache/avro/ipc/ByteBufferOutputStream.java
avro/trunk/lang/java/src/java/org/apache/avro/ipc/RPCContext.java
avro/trunk/lang/java/src/java/org/apache/avro/ipc/RPCPlugin.java
avro/trunk/lang/java/src/java/org/apache/avro/ipc/Requestor.java
avro/trunk/lang/java/src/java/org/apache/avro/ipc/Responder.java
avro/trunk/lang/java/src/java/org/apache/avro/ipc/SocketTransceiver.java
avro/trunk/lang/java/src/test/java/org/apache/avro/RPCMetaTestPlugin.java
Modified: avro/trunk/CHANGES.txt
URL:
http://svn.apache.org/viewvc/avro/trunk/CHANGES.txt?rev=958149&r1=958148&r2=958149&view=diff
==============================================================================
--- avro/trunk/CHANGES.txt (original)
+++ avro/trunk/CHANGES.txt Fri Jun 25 23:40:37 2010
@@ -22,6 +22,9 @@ Avro 1.4.0 (unreleased)
AVRO-567. Add command-line tools for text file import & export.
(Patrick Wendell via cutting)
+ AVRO-578. Java: add payload data to RPC context for use by
+ plugins. (Patrick Wendell via cutting)
+
IMPROVEMENTS
AVRO-584. Update Histogram for Stats Plugin
(Patrick Wendell via philz)
Modified:
avro/trunk/lang/java/src/java/org/apache/avro/ipc/ByteBufferOutputStream.java
URL:
http://svn.apache.org/viewvc/avro/trunk/lang/java/src/java/org/apache/avro/ipc/ByteBufferOutputStream.java?rev=958149&r1=958148&r2=958149&view=diff
==============================================================================
---
avro/trunk/lang/java/src/java/org/apache/avro/ipc/ByteBufferOutputStream.java
(original)
+++
avro/trunk/lang/java/src/java/org/apache/avro/ipc/ByteBufferOutputStream.java
Fri Jun 25 23:40:37 2010
@@ -22,7 +22,7 @@ import java.io.IOException;
import java.io.OutputStream;
import java.nio.ByteBuffer;
-import java.util.ArrayList;
+import java.util.LinkedList;
import java.util.List;
/** Utility to collect data written to an {...@link OutputStream} in {...@link
@@ -44,8 +44,24 @@ public class ByteBufferOutputStream exte
return result;
}
+ /** Prepend a list of ByteBuffers to this stream. */
+ public void prepend(List<ByteBuffer> lists) {
+ for (ByteBuffer buffer: lists) {
+ buffer.position(buffer.limit());
+ }
+ buffers.addAll(0, lists);
+ }
+
+ /** Append a list of ByteBuffers to this stream. */
+ public void append(List<ByteBuffer> lists) {
+ for (ByteBuffer buffer: lists) {
+ buffer.position(buffer.limit());
+ }
+ buffers.addAll(lists);
+ }
+
public void reset() {
- buffers = new ArrayList<ByteBuffer>(1);
+ buffers = new LinkedList<ByteBuffer>();
buffers.add(ByteBuffer.allocate(BUFFER_SIZE));
}
Modified: avro/trunk/lang/java/src/java/org/apache/avro/ipc/RPCContext.java
URL:
http://svn.apache.org/viewvc/avro/trunk/lang/java/src/java/org/apache/avro/ipc/RPCContext.java?rev=958149&r1=958148&r2=958149&view=diff
==============================================================================
--- avro/trunk/lang/java/src/java/org/apache/avro/ipc/RPCContext.java (original)
+++ avro/trunk/lang/java/src/java/org/apache/avro/ipc/RPCContext.java Fri Jun
25 23:40:37 2010
@@ -19,6 +19,7 @@ package org.apache.avro.ipc;
import java.nio.ByteBuffer;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
import org.apache.avro.Protocol.Message;
@@ -28,7 +29,9 @@ import org.apache.avro.util.Utf8;
* This class represents the context of an RPC call or RPC handshake.
* Designed to provide information to RPC plugin writers,
* this class encapsulates information about the rpc exchange,
- * including handshake and call metadata.
+ * including handshake and call metadata. Note: this data includes
+ * full copies of the RPC payload, so plugins which store RPCContexts
+ * beyond the life of each call should be conscious of memory use.
*
*/
public class RPCContext {
@@ -39,6 +42,8 @@ public class RPCContext {
protected Object response;
protected Exception error;
private Message message;
+ List<ByteBuffer> requestPayload;
+ List<ByteBuffer> responsePayload;
/**
* This is an access method for the handshake state
@@ -148,10 +153,44 @@ public class RPCContext {
public boolean isError() {
return error != null;
}
-
+
+ /** Sets the {...@link Message} corresponding to this RPC */
public void setMessage(Message message) {
this.message = message;
}
+ /** Returns the {...@link Message} corresponding to this RPC
+ * @return this RPC's {...@link Message}
+ */
public Message getMessage() { return message; }
+
+ /** Sets the serialized payload of the request in this RPC. Will
+ * not include handshake or meta-data. */
+ public void setRequestPayload(List<ByteBuffer> payload) {
+ this.requestPayload = payload;
+ }
+
+ /** Returns the serialized payload of the request in this RPC. Will
+ * not include handshake or meta-data. If the request payload has not been
+ * set yet, returns null.
+ *
+ * @return this RPC's request payload.*/
+ public List<ByteBuffer> getRequestPayload() {
+ return this.requestPayload;
+ }
+
+ /** Returns the serialized payload of the response in this RPC. Will
+ * not include handshake or meta-data. If the response payload has not been
+ * set yet, returns null.
+ *
+ * @return this RPC's response payload.*/
+ public List<ByteBuffer> getResponsePayload() {
+ return this.responsePayload;
+ }
+
+ /** Sets the serialized payload of the response in this RPC. Will
+ * not include handshake or meta-data. */
+ public void setResponsePayload(List<ByteBuffer> payload) {
+ this.responsePayload = payload;
+ }
}
Modified: avro/trunk/lang/java/src/java/org/apache/avro/ipc/RPCPlugin.java
URL:
http://svn.apache.org/viewvc/avro/trunk/lang/java/src/java/org/apache/avro/ipc/RPCPlugin.java?rev=958149&r1=958148&r2=958149&view=diff
==============================================================================
--- avro/trunk/lang/java/src/java/org/apache/avro/ipc/RPCPlugin.java (original)
+++ avro/trunk/lang/java/src/java/org/apache/avro/ipc/RPCPlugin.java Fri Jun 25
23:40:37 2010
@@ -43,12 +43,13 @@ public class RPCPlugin {
* @param context the handshake rpc context
*/
public void clientFinishConnect(RPCContext context) { }
-
+
/**
* This method is invoked at the client before it issues the RPC call.
* @param context the per-call rpc context (in/out parameter)
*/
public void clientSendRequest(RPCContext context) { }
+
/**
* This method is invoked at the RPC server when the request is received,
@@ -58,17 +59,18 @@ public class RPCPlugin {
public void serverReceiveRequest(RPCContext context) { }
/**
- * This method is invoked at the server after the call is executed,
- * but before the response is returned to the client
+ * This method is invoked at the server before the response is executed,
+ * but before the response has been formulated
* @param context the per-call rpc context (in/out parameter)
*/
public void serverSendResponse(RPCContext context) { }
-
+
/**
* This method is invoked at the client after the call is executed,
* and after the client receives the response
* @param context the per-call rpc context
*/
public void clientReceiveResponse(RPCContext context) { }
+
}
Modified: avro/trunk/lang/java/src/java/org/apache/avro/ipc/Requestor.java
URL:
http://svn.apache.org/viewvc/avro/trunk/lang/java/src/java/org/apache/avro/ipc/Requestor.java?rev=958149&r1=958148&r2=958149&view=diff
==============================================================================
--- avro/trunk/lang/java/src/java/org/apache/avro/ipc/Requestor.java (original)
+++ avro/trunk/lang/java/src/java/org/apache/avro/ipc/Requestor.java Fri Jun 25
23:40:37 2010
@@ -92,28 +92,36 @@ public abstract class Requestor {
do {
ByteBufferOutputStream bbo = new ByteBufferOutputStream();
Encoder out = new BinaryEncoder(bbo);
-
- writeHandshake(out); // prepend handshake if needed
-
+
// use local protocol to write request
m = getLocal().getMessages().get(messageName);
if (m == null)
throw new AvroRuntimeException("Not a local message: "+messageName);
context.setMessage(m);
+
+ writeRequest(m.getRequest(), request, out); // write request payload
+ List<ByteBuffer> payload = bbo.getBufferList();
+ context.setRequestPayload(payload);
for (RPCPlugin plugin : rpcMetaPlugins) {
- plugin.clientSendRequest(context);
+ plugin.clientSendRequest(context); // get meta-data from plugins
}
+ writeHandshake(out); // prepend handshake if needed
META_WRITER.write(context.requestCallMeta(), out);
out.writeString(m.getName()); // write message name
- writeRequest(m.getRequest(), request, out); // write request payload
+ bbo.append(payload);
+
+ List<ByteBuffer> requestBytes = bbo.getBufferList();
+
if (m.isOneWay() && t.isConnected()) { // send one-way message
- t.writeBuffers(bbo.getBufferList());
+ t.writeBuffers(requestBytes);
+
return null;
} else { // two-way message
- List<ByteBuffer> response = t.transceive(bbo.getBufferList());
+ List<ByteBuffer> response = t.transceive(requestBytes);
+ context.setResponsePayload(response);
ByteBufferInputStream bbi = new ByteBufferInputStream(response);
in = DecoderFactory.defaultFactory().createBinaryDecoder(bbi, in);
}
Modified: avro/trunk/lang/java/src/java/org/apache/avro/ipc/Responder.java
URL:
http://svn.apache.org/viewvc/avro/trunk/lang/java/src/java/org/apache/avro/ipc/Responder.java?rev=958149&r1=958148&r2=958149&view=diff
==============================================================================
--- avro/trunk/lang/java/src/java/org/apache/avro/ipc/Responder.java (original)
+++ avro/trunk/lang/java/src/java/org/apache/avro/ipc/Responder.java Fri Jun 25
23:40:37 2010
@@ -96,15 +96,18 @@ public abstract class Responder {
Decoder in = DecoderFactory.defaultFactory().createBinaryDecoder(
new ByteBufferInputStream(buffers), null);
ByteBufferOutputStream bbo = new ByteBufferOutputStream();
- Encoder out = new BinaryEncoder(bbo);
+ BinaryEncoder out = new BinaryEncoder(bbo);
Exception error = null;
RPCContext context = new RPCContext();
+ List<ByteBuffer> payload = null;
+ List<ByteBuffer> handshake = null;
boolean wasConnected = connection != null && connection.isConnected();
try {
Protocol remote = handshake(in, out, connection);
if (remote == null) // handshake failed
return bbo.getBufferList();
-
+ handshake = bbo.getBufferList();
+
// read request using remote protocol specification
context.setRequestCallMeta(META_READER.read(null, in));
String messageName = in.readString(null).toString();
@@ -112,10 +115,10 @@ public abstract class Responder {
if (rm == null)
throw new AvroRuntimeException("No such remote message: "+messageName);
- context.setMessage(rm);
-
Object request = readRequest(rm.getRequest(), in);
+ context.setRequestPayload(buffers);
+ context.setMessage(rm);
for (RPCPlugin plugin : rpcMetaPlugins) {
plugin.serverReceiveRequest(context);
}
@@ -129,6 +132,7 @@ public abstract class Responder {
throw new AvroRuntimeException("Not both one-way: "+messageName);
Object response = null;
+
try {
response = respond(m, request);
context.setResponse(response);
@@ -140,27 +144,33 @@ public abstract class Responder {
if (m.isOneWay() && wasConnected) // no response data
return null;
- for (RPCPlugin plugin : rpcMetaPlugins) {
- plugin.serverSendResponse(context);
- }
-
- META_WRITER.write(context.responseCallMeta(), out);
out.writeBoolean(error != null);
if (error == null)
writeResponse(m.getResponse(), response, out);
else
writeError(m.getErrors(), error, out);
-
} catch (Exception e) { // system error
LOG.warn("system error", e);
context.setError(e);
bbo = new ByteBufferOutputStream();
out = new BinaryEncoder(bbo);
- META_WRITER.write(context.responseCallMeta(), out);
out.writeBoolean(true);
writeError(Protocol.SYSTEM_ERRORS, new Utf8(e.toString()), out);
}
-
+
+ payload = bbo.getBufferList();
+
+ // Grab meta-data from plugins
+ context.setResponsePayload(payload);
+ for (RPCPlugin plugin : rpcMetaPlugins) {
+ plugin.serverSendResponse(context);
+ }
+ META_WRITER.write(context.responseCallMeta(), out);
+
+ // Prepend handshake and append payload
+ bbo.prepend(handshake);
+ bbo.append(payload);
+
return bbo.getBufferList();
}
Modified:
avro/trunk/lang/java/src/java/org/apache/avro/ipc/SocketTransceiver.java
URL:
http://svn.apache.org/viewvc/avro/trunk/lang/java/src/java/org/apache/avro/ipc/SocketTransceiver.java?rev=958149&r1=958148&r2=958149&view=diff
==============================================================================
--- avro/trunk/lang/java/src/java/org/apache/avro/ipc/SocketTransceiver.java
(original)
+++ avro/trunk/lang/java/src/java/org/apache/avro/ipc/SocketTransceiver.java
Fri Jun 25 23:40:37 2010
@@ -80,6 +80,7 @@ public class SocketTransceiver extends T
throws IOException {
if (buffers == null) return; // no data to write
for (ByteBuffer buffer : buffers) {
+ if (buffer.limit() == 0) continue;
writeLength(buffer.limit()); // length-prefix
channel.write(buffer);
}
Modified:
avro/trunk/lang/java/src/test/java/org/apache/avro/RPCMetaTestPlugin.java
URL:
http://svn.apache.org/viewvc/avro/trunk/lang/java/src/test/java/org/apache/avro/RPCMetaTestPlugin.java?rev=958149&r1=958148&r2=958149&view=diff
==============================================================================
--- avro/trunk/lang/java/src/test/java/org/apache/avro/RPCMetaTestPlugin.java
(original)
+++ avro/trunk/lang/java/src/test/java/org/apache/avro/RPCMetaTestPlugin.java
Fri Jun 25 23:40:37 2010
@@ -31,7 +31,8 @@ import org.apache.avro.util.Utf8;
* This plugin tests handshake and call state by passing a string as metadata,
* slowly building it up at each instrumentation point, testing it as it goes.
* Finally, after the call or handshake is complete, the constructed string is
- * tested.
+ * tested. It also tests that RPC context data is appropriately filled in
+ * along the way by Requestor and Responder classes.
*/
public final class RPCMetaTestPlugin extends RPCPlugin {
@@ -52,6 +53,8 @@ public final class RPCMetaTestPlugin ext
Assert.assertNotNull(context.requestHandshakeMeta());
Assert.assertNotNull(context.responseHandshakeMeta());
+ Assert.assertNull(context.getRequestPayload());
+ Assert.assertNull(context.getResponsePayload());
if (!context.requestHandshakeMeta().containsKey(key)) return;
@@ -72,6 +75,8 @@ public final class RPCMetaTestPlugin ext
public void clientFinishConnect(RPCContext context) {
Map<Utf8,ByteBuffer> handshakeMeta = context.responseHandshakeMeta();
+ Assert.assertNull(context.getRequestPayload());
+ Assert.assertNull(context.getResponsePayload());
Assert.assertNotNull(handshakeMeta);
if (!handshakeMeta.containsKey(key)) return;
@@ -96,13 +101,18 @@ public final class RPCMetaTestPlugin ext
ByteBuffer buf = ByteBuffer.wrap("ap".getBytes());
context.requestCallMeta().put(key, buf);
Assert.assertNotNull(context.getMessage());
+ Assert.assertNotNull(context.getRequestPayload());
+ Assert.assertNull(context.getResponsePayload());
}
@Override
public void serverReceiveRequest(RPCContext context) {
Map<Utf8,ByteBuffer> meta = context.requestCallMeta();
- Assert.assertNotNull(meta);
+ Assert.assertNotNull(meta);
+ Assert.assertNotNull(context.getMessage());
+ Assert.assertNotNull(context.getRequestPayload());
+ Assert.assertNull(context.getResponsePayload());
if (!meta.containsKey(key)) return;
@@ -117,14 +127,15 @@ public final class RPCMetaTestPlugin ext
buf = ByteBuffer.wrap((partialstr + "a").getBytes());
Assert.assertTrue(buf.remaining() > 0);
meta.put(key, buf);
-
- Assert.assertNotNull(context.getMessage());
}
@Override
public void serverSendResponse(RPCContext context) {
Assert.assertNotNull(context.requestCallMeta());
Assert.assertNotNull(context.responseCallMeta());
+
+ Assert.assertNotNull(context.getRequestPayload());
+ Assert.assertNotNull(context.getResponsePayload());
if (!context.requestCallMeta().containsKey(key)) return;
@@ -144,6 +155,8 @@ public final class RPCMetaTestPlugin ext
@Override
public void clientReceiveResponse(RPCContext context) {
Assert.assertNotNull(context.responseCallMeta());
+ Assert.assertNotNull(context.getRequestPayload());
+ Assert.assertNotNull(context.getResponsePayload());
if (!context.responseCallMeta().containsKey(key)) return;