I put together a SocketTransceiver for Python so that I could compare
performance against the HTTP one, but I'm having some trouble getting it
to work. I'm suspect it's something really dumb, that I'm just too
close to spot it, and that someone else might.
The patch against 1.4 is attached. It causes the Java server to produce
the following exception:
java.io.EOFException
at
org.apache.avro.ipc.ByteBufferInputStream.getBuffer(ByteBufferInputStream.java:84)
at
org.apache.avro.ipc.ByteBufferInputStream.read(ByteBufferInputStream.java:46)
at org.apache.avro.io.BinaryDecoder
$InputStreamByteSource.readRaw(BinaryDecoder.java:815)
at org.apache.avro.io.BinaryDecoder.doReadBytes(BinaryDecoder.java:340)
at org.apache.avro.io.BinaryDecoder.readString(BinaryDecoder.java:265)
at
org.apache.avro.io.ValidatingDecoder.readString(ValidatingDecoder.java:99)
at
org.apache.avro.generic.GenericDatumReader.readString(GenericDatumReader.java:318)
at
org.apache.avro.generic.GenericDatumReader.readMap(GenericDatumReader.java:229)
at
org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:117)
at
org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:105)
at org.apache.avro.ipc.Responder.respond(Responder.java:112)
at org.apache.avro.ipc.SocketServer
$Connection.run(SocketServer.java:91)
at java.lang.Thread.run(Thread.java:636)
Anyone?
--
Eric Evans
[email protected]
diff --git a/lang/py/src/avro/ipc.py b/lang/py/src/avro/ipc.py
index f302d63..5b84326 100644
--- a/lang/py/src/avro/ipc.py
+++ b/lang/py/src/avro/ipc.py
@@ -16,7 +16,7 @@
"""
Support for inter-process calls.
"""
-import httplib
+import httplib, socket, struct
try:
from cStringIO import StringIO
except ImportError:
@@ -478,6 +478,72 @@ class HTTPTransceiver(object):
def close(self):
self.conn.close()
+class SocketTransceiver(object):
+ """
+ A simple socket-based transceiver implementation.
+ Useful for clients but not for servers
+ """
+ def __init__(self, host, port, timeout=socket._GLOBAL_DEFAULT_TIMEOUT):
+ self._sock = socket.create_connection((host, port), timeout)
+
+ # read-only properties
+ sock = property(lambda self: self._sock)
+ remote_name = property(lambda self: self.sock.getsockname())
+
+ def transceive(self, request):
+ self.write_framed_message(request)
+ return self.read_framed_message()
+
+ def read_framed_message(self):
+ msg = StringIO()
+ while True:
+ buff = StringIO()
+ size = self.__read_length()
+ if size == 0:
+ return msg.getvalue()
+ while buff.tell() < size:
+ chunk = self._sock.recv(size-buff.tell())
+ if chunk == '':
+ raise ConnectionClosedException("socket read 0 bytes")
+ buff.write(chunk)
+ msg.write(buff.getvalue())
+
+ def write_framed_message(self, msg):
+ totalsize = len(msg)
+ totalsent = 0
+ while (totalsize - totalsent) > 0:
+ if (totalsize - totalsent) > BUFFER_SIZE:
+ batchsize = BUFFER_SIZE
+ else:
+ batchsize = (totalsize - totalsent)
+ self.__write_buffer(msg[totalsent:(totalsent + batchsize)])
+ totalsent += batchsize
+ self.__write_length(0) # null terminate
+
+ def __write_buffer(self, msg):
+ size = len(msg)
+ self.__write_length(size)
+ totalsent = 0
+ while totalsent < size:
+ sent = self._sock.send(msg[totalsent:])
+ if sent == 0:
+ raise ConnectionClosedException("socket sent 0 bytes")
+ totalsent += sent
+
+ def __write_length(self, length):
+ if self._sock.sendall(struct.pack('>i', length)) == 0:
+ raise ConnectionClosedException("socket sent 0 bytes")
+
+ def __read_length(self):
+ length = self._sock.recv(4)
+ if length == '':
+ raise ConnectionClosedException("socket read 0 bytes")
+ return struct.unpack('>i', length)[0]
+
+ def close(self):
+ self._sock.shutdown(socket.SHUT_RDWR)
+ self._sock.close()
+
#
# Server Implementations (none yet)
#