Author: cutting
Date: Wed Jul 1 21:02:13 2009
New Revision: 790377
URL: http://svn.apache.org/viewvc?rev=790377&view=rev
Log:
AVRO-66. Add per-call metadata to Java and Python. Contributed by George
Porter & cutting.
Modified:
hadoop/avro/trunk/CHANGES.txt
hadoop/avro/trunk/src/java/org/apache/avro/ipc/Requestor.java
hadoop/avro/trunk/src/java/org/apache/avro/ipc/Responder.java
hadoop/avro/trunk/src/py/avro/ipc.py
Modified: hadoop/avro/trunk/CHANGES.txt
URL:
http://svn.apache.org/viewvc/hadoop/avro/trunk/CHANGES.txt?rev=790377&r1=790376&r2=790377&view=diff
==============================================================================
--- hadoop/avro/trunk/CHANGES.txt (original)
+++ hadoop/avro/trunk/CHANGES.txt Wed Jul 1 21:02:13 2009
@@ -17,6 +17,9 @@
AVRO-46. Optimized RPC handshake protocol for Python. (sharad)
+ AVRO-66. Add per-call RPC metadata to Java and Python. (George
+ Porter & cutting)
+
NEW FEATURES
AVRO-6. Permit easier implementation of alternate generic data
Modified: hadoop/avro/trunk/src/java/org/apache/avro/ipc/Requestor.java
URL:
http://svn.apache.org/viewvc/hadoop/avro/trunk/src/java/org/apache/avro/ipc/Requestor.java?rev=790377&r1=790376&r2=790377&view=diff
==============================================================================
--- hadoop/avro/trunk/src/java/org/apache/avro/ipc/Requestor.java (original)
+++ hadoop/avro/trunk/src/java/org/apache/avro/ipc/Requestor.java Wed Jul 1
21:02:13 2009
@@ -29,6 +29,8 @@
import org.apache.avro.Protocol;
import org.apache.avro.Schema;
import org.apache.avro.Protocol.Message;
+import org.apache.avro.generic.GenericDatumReader;
+import org.apache.avro.generic.GenericDatumWriter;
import org.apache.avro.io.Decoder;
import org.apache.avro.io.Encoder;
import org.apache.avro.io.BinaryEncoder;
@@ -43,6 +45,13 @@
public abstract class Requestor {
private static final Logger LOG = LoggerFactory.getLogger(Requestor.class);
+ private static final Schema META =
+ Schema.createMap(Schema.create(Schema.Type.BYTES));
+ private static final GenericDatumReader<Map<Utf8,ByteBuffer>> META_READER =
+ new GenericDatumReader<Map<Utf8,ByteBuffer>>(META);
+ private static final GenericDatumWriter<Map<Utf8,ByteBuffer>> META_WRITER =
+ new GenericDatumWriter<Map<Utf8,ByteBuffer>>(META);
+
private Protocol local;
private Protocol remote;
private boolean established, sendLocalText;
@@ -63,6 +72,7 @@
throws IOException {
Decoder in;
Message m;
+ Map<Utf8,ByteBuffer> requestMeta = new HashMap<Utf8,ByteBuffer>();
do {
ByteBufferOutputStream bbo = new ByteBufferOutputStream();
Encoder out = new BinaryEncoder(bbo);
@@ -75,6 +85,7 @@
if (m == null)
throw new AvroRuntimeException("Not a local message: "+messageName);
+ META_WRITER.write(requestMeta, out);
out.writeString(m.getName()); // write message name
writeRequest(m.getRequest(), request, out); // write request payload
@@ -91,6 +102,7 @@
m = getRemote().getMessages().get(messageName);
if (m == null)
throw new AvroRuntimeException("Not a remote message: "+messageName);
+ Map<Utf8,ByteBuffer> responseMeta = META_READER.read(null, in);
if (!in.readBoolean()) { // no error
return readResponse(m.getResponse(), in);
} else {
Modified: hadoop/avro/trunk/src/java/org/apache/avro/ipc/Responder.java
URL:
http://svn.apache.org/viewvc/hadoop/avro/trunk/src/java/org/apache/avro/ipc/Responder.java?rev=790377&r1=790376&r2=790377&view=diff
==============================================================================
--- hadoop/avro/trunk/src/java/org/apache/avro/ipc/Responder.java (original)
+++ hadoop/avro/trunk/src/java/org/apache/avro/ipc/Responder.java Wed Jul 1
21:02:13 2009
@@ -28,6 +28,8 @@
import org.apache.avro.*;
import org.apache.avro.Protocol.Message;
import org.apache.avro.util.*;
+import org.apache.avro.generic.GenericDatumReader;
+import org.apache.avro.generic.GenericDatumWriter;
import org.apache.avro.io.*;
import org.apache.avro.specific.*;
@@ -35,6 +37,13 @@
public abstract class Responder {
private static final Logger LOG = LoggerFactory.getLogger(Responder.class);
+ private static final Schema META =
+ Schema.createMap(Schema.create(Schema.Type.BYTES));
+ private static final GenericDatumReader<Map<Utf8,ByteBuffer>> META_READER =
+ new GenericDatumReader<Map<Utf8,ByteBuffer>>(META);
+ private static final GenericDatumWriter<Map<Utf8,ByteBuffer>> META_WRITER =
+ new GenericDatumWriter<Map<Utf8,ByteBuffer>>(META);
+
private Map<Transceiver,Protocol> remotes
= Collections.synchronizedMap(new WeakHashMap<Transceiver,Protocol>());
private Map<MD5,Protocol> protocols
@@ -69,6 +78,7 @@
return bbo.getBufferList();
// read request using remote protocol specification
+ Map<Utf8,ByteBuffer> requestMeta = META_READER.read(null, in);
String messageName = in.readString(null).toString();
Message m = remote.getMessages().get(messageName);
if (m == null)
@@ -90,6 +100,8 @@
error = new AvroRemoteException(new Utf8(e.toString()));
}
+ Map<Utf8,ByteBuffer> responseMeta = new HashMap<Utf8,ByteBuffer>();
+ META_WRITER.write(responseMeta, out);
out.writeBoolean(error != null);
if (error == null)
writeResponse(m.getResponse(), response, out);
Modified: hadoop/avro/trunk/src/py/avro/ipc.py
URL:
http://svn.apache.org/viewvc/hadoop/avro/trunk/src/py/avro/ipc.py?rev=790377&r1=790376&r2=790377&view=diff
==============================================================================
--- hadoop/avro/trunk/src/py/avro/ipc.py (original)
+++ hadoop/avro/trunk/src/py/avro/ipc.py Wed Jul 1 21:02:13 2009
@@ -20,6 +20,7 @@
import avro.schema as schema
import avro.protocol as protocol
import avro.io as io
+import avro.genericio as genericio
import avro.reflectio as reflectio
class TransceiverBase(object):
@@ -79,6 +80,10 @@
_REMOTE_HASHES = dict()
_REMOTE_PROTOCOLS = dict()
+_META_SCHEMA = schema.parse("{\"type\": \"map\", \"values\": \"bytes\"}")
+_META_WRITER = genericio.DatumWriter(_META_SCHEMA)
+_META_READER = genericio.DatumReader(_META_SCHEMA)
+
class RequestorBase(object):
"""Base class for the client side of a protocol interaction."""
@@ -107,6 +112,8 @@
encoder = io.Encoder(buf)
if not self.__established:
self.__writehandshake(encoder)
+ requestmeta = dict()
+ _META_WRITER.write(requestmeta, encoder)
m = self.__localproto.getmessages().get(msgname)
if m is None:
raise schema.AvroException("Not a local message: "+msgname.__str__())
@@ -116,6 +123,7 @@
decoder = io.Decoder(cStringIO.StringIO(response))
if not self.__established:
self.__readhandshake(decoder)
+ responsemeta = _META_READER.read(decoder)
m = self.getremote().getmessages().get(msgname)
if m is None:
raise schema.AvroException("Not a remote message: "+msgname.__str__())
@@ -197,6 +205,7 @@
buf = cStringIO.StringIO()
encoder = io.Encoder(buf)
error = None
+ responsemeta = dict()
try:
remoteproto = self.__handshake(transceiver, decoder, encoder)
@@ -204,6 +213,7 @@
return buf.getvalue()
#read request using remote protocol specification
+ requestmeta = _META_READER.read(decoder)
msgname = decoder.readutf8()
m = remoteproto.getmessages().get(msgname)
if m is None:
@@ -220,6 +230,7 @@
error = e
except Exception, e:
error = AvroRemoteException(unicode(e.__str__()))
+ _META_WRITER.write(responsemeta, encoder)
encoder.writeboolean(error is not None)
if error is None:
self.writeresponse(m.getresponse(), response, encoder)
@@ -229,6 +240,7 @@
error = AvroRemoteException(unicode(e.__str__()))
buf = cStringIO.StringIO()
encoder = io.Encoder(buf)
+ _META_WRITER.write(responsemeta, encoder)
encoder.writeboolean(True)
self.writeerror(protocol._SYSTEM_ERRORS, error, encoder)