Author: cutting
Date: Thu Jun 25 18:12:00 2009
New Revision: 788453
URL: http://svn.apache.org/viewvc?rev=788453&view=rev
Log:
AVRO-46. Optimized RPC handshake protocol for Python. Contributed by sharad.
Added:
hadoop/avro/trunk/src/py/avro/genericio.py
hadoop/avro/trunk/src/py/avro/genericipc.py
hadoop/avro/trunk/src/py/avro/reflectio.py
hadoop/avro/trunk/src/py/avro/reflectipc.py
Removed:
hadoop/avro/trunk/src/py/avro/generic.py
hadoop/avro/trunk/src/py/avro/reflect.py
Modified:
hadoop/avro/trunk/CHANGES.txt
hadoop/avro/trunk/build.xml
hadoop/avro/trunk/src/py/avro/ (props changed)
hadoop/avro/trunk/src/py/avro/io.py
hadoop/avro/trunk/src/py/avro/ipc.py
hadoop/avro/trunk/src/py/avro/protocol.py
hadoop/avro/trunk/src/test/py/interoptests.py
hadoop/avro/trunk/src/test/py/testio.py
hadoop/avro/trunk/src/test/py/testioreflect.py
hadoop/avro/trunk/src/test/py/testipc.py
hadoop/avro/trunk/src/test/py/testipcreflect.py
Modified: hadoop/avro/trunk/CHANGES.txt
URL:
http://svn.apache.org/viewvc/hadoop/avro/trunk/CHANGES.txt?rev=788453&r1=788452&r2=788453&view=diff
==============================================================================
--- hadoop/avro/trunk/CHANGES.txt (original)
+++ hadoop/avro/trunk/CHANGES.txt Thu Jun 25 18:12:00 2009
@@ -15,6 +15,8 @@
ValueReader an abstract class named Decoder, and add concrete
implementations named BinaryEncoder and BinaryDecoder. (cutting)
+ AVRO-46. Optimized RPC handshake protocol for Python. (sharad)
+
NEW FEATURES
AVRO-6. Permit easier implementation of alternate generic data
Modified: hadoop/avro/trunk/build.xml
URL:
http://svn.apache.org/viewvc/hadoop/avro/trunk/build.xml?rev=788453&r1=788452&r2=788453&view=diff
==============================================================================
--- hadoop/avro/trunk/build.xml (original)
+++ hadoop/avro/trunk/build.xml Thu Jun 25 18:12:00 2009
@@ -263,7 +263,15 @@
<pathelement location="${basedir}/lib/py"/>
</path>
- <target name="generate-test-data" depends="compile-test-java">
+ <target name="init-py" depends="init, schemata">
+ <copy todir="${basedir}/src/py/avro">
+ <fileset dir="${build.dir}/src/org/apache/avro/ipc">
+ <include name="**/*.avsc"/>
+ </fileset>
+ </copy>
+ </target>
+
+ <target name="generate-test-data" depends="compile-test-java, init-py">
<mkdir dir="${test.java.build.dir}/data-files"/>
<java classname="org.apache.avro.RandomData"
classpathref="test.java.classpath">
@@ -283,7 +291,7 @@
</py-run>
</target>
- <target name="test-py" depends="init" description="Run python unit tests">
+ <target name="test-py" depends="init-py" description="Run python unit tests">
<taskdef name="py-test" classname="org.pyant.tasks.PythonTestTask">
<classpath refid="java.classpath" />
</taskdef>
@@ -339,7 +347,7 @@
</py-test>
</target>
- <target name="start-rpc-daemons" depends="compile-test-java"
+ <target name="start-rpc-daemons" depends="compile-test-java, init-py"
description="Start the daemons for rpc interoperability tests">
<delete dir="${test.java.build.dir}/server-ports"/>
<mkdir dir="${test.java.build.dir}/server-ports"/>
@@ -530,6 +538,7 @@
<delete dir="${build.dir}"/>
<delete>
<fileset dir="src" includes="**/*.pyc" />
+ <fileset dir="${basedir}/src/py/avro" includes="**/*.avsc"/>
</delete>
</target>
Propchange: hadoop/avro/trunk/src/py/avro/
------------------------------------------------------------------------------
--- svn:ignore (original)
+++ svn:ignore Thu Jun 25 18:12:00 2009
@@ -1 +1,2 @@
*.pyc
+Handshake*.avsc
Added: hadoop/avro/trunk/src/py/avro/genericio.py
URL:
http://svn.apache.org/viewvc/hadoop/avro/trunk/src/py/avro/genericio.py?rev=788453&view=auto
==============================================================================
--- hadoop/avro/trunk/src/py/avro/genericio.py (added)
+++ hadoop/avro/trunk/src/py/avro/genericio.py Thu Jun 25 18:12:00 2009
@@ -0,0 +1,254 @@
+#Licensed to the Apache Software Foundation (ASF) under one
+#or more contributor license agreements. See the NOTICE file
+#distributed with this work for additional information
+#regarding copyright ownership. The ASF licenses this file
+#to you under the Apache License, Version 2.0 (the
+#"License"); you may not use this file except in compliance
+#with the License. You may obtain a copy of the License at
+#
+#http://www.apache.org/licenses/LICENSE-2.0
+#
+#Unless required by applicable law or agreed to in writing, software
+#distributed under the License is distributed on an "AS IS" BASIS,
+#WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#See the License for the specific language governing permissions and
+#limitations under the License.
+
+"""Generic representation for data.
+Represent Schema data with generic python types.
+
+Uses the following mapping:
+ * Schema records are implemented as dict.
+ * Schema arrays are implemented as list.
+ * Schema maps are implemented as dict.
+ * Schema strings are implemented as unicode.
+ * Schema bytes are implemented as str.
+ * Schema ints are implemented as int.
+ * Schema longs are implemented as long.
+ * Schema floats are implemented as float.
+ * Schema doubles are implemented as float.
+ * Schema booleans are implemented as bool.
+"""
+
+import avro.schema as schema
+import avro.io as io
+
+def _validatearray(schm, object):
+ if not isinstance(object, list):
+ return False
+ for elem in object:
+ if not validate(schm.getelementtype(), elem):
+ return False
+ return True
+
+def _validatemap(schm, object):
+ if not isinstance(object, dict):
+ return False
+ for k,v in object.items():
+ if not validate(schm.getvaluetype(), v):
+ return False
+ return True
+
+def _validaterecord(schm, object):
+ if not isinstance(object, dict):
+ return False
+ for field,fieldschema in schm.getfields():
+ if not validate(fieldschema, object.get(field)):
+ return False
+ return True
+
+def _validateunion(schm, object):
+ for elemtype in schm.getelementtypes():
+ if validate(elemtype, object):
+ return True
+ return False
+
+_validatefn = {
+ schema.NULL : lambda schm, object: object is None,
+ schema.BOOLEAN : lambda schm, object: isinstance(object, bool),
+ schema.STRING : lambda schm, object: isinstance(object, unicode),
+ schema.FLOAT : lambda schm, object: isinstance(object, float),
+ schema.DOUBLE : lambda schm, object: isinstance(object, float),
+ schema.BYTES : lambda schm, object: isinstance(object, str),
+ schema.INT : lambda schm, object: ((isinstance(object, long) or
+ isinstance(object, int)) and
+ io._INT_MIN_VALUE <= object <=
io._INT_MAX_VALUE),
+ schema.LONG : lambda schm, object: ((isinstance(object, long) or
+ isinstance(object, int)) and
+ io._LONG_MIN_VALUE <= object <=
io._LONG_MAX_VALUE),
+ schema.ENUM : lambda schm, object:
+ schm.getenumsymbols().__contains__(object),
+ schema.FIXED : lambda schm, object:
+ (isinstance(object, str) and
+ len(object) == schm.getsize()),
+ schema.ARRAY : _validatearray,
+ schema.MAP : _validatemap,
+ schema.RECORD : _validaterecord,
+ schema.UNION : _validateunion
+ }
+
+def validate(schm, object):
+ """Returns True if a python datum matches a schema."""
+ fn = _validatefn.get(schm.gettype())
+ if fn is not None:
+ return fn(schm, object)
+ else:
+ return False
+
+class DatumReader(io.DatumReaderBase):
+ """DatumReader for generic python objects."""
+
+ def __init__(self, schm=None):
+ self.setschema(schm)
+ self.__readfn = {
+ schema.BOOLEAN : lambda schm, decoder: decoder.readboolean(),
+ schema.STRING : lambda schm, decoder: decoder.readutf8(),
+ schema.INT : lambda schm, decoder: decoder.readint(),
+ schema.LONG : lambda schm, decoder: decoder.readlong(),
+ schema.FLOAT : lambda schm, decoder: decoder.readfloat(),
+ schema.DOUBLE : lambda schm, decoder: decoder.readdouble(),
+ schema.BYTES : lambda schm, decoder: decoder.readbytes(),
+ schema.FIXED : lambda schm, decoder:
+ (decoder.read(schm.getsize())),
+ schema.ARRAY : self.readarray,
+ schema.MAP : self.readmap,
+ schema.RECORD : self.readrecord,
+ schema.ENUM : self.readenum,
+ schema.UNION : self.readunion
+ }
+
+ def setschema(self, schm):
+ self.__schm = schm
+
+ def read(self, decoder):
+ return self.readdata(self.__schm, decoder)
+
+ def readdata(self, schm, decoder):
+ if schm.gettype() == schema.NULL:
+ return None
+ fn = self.__readfn.get(schm.gettype())
+ if fn is not None:
+ return fn(schm, decoder)
+ else:
+ raise AvroException("Unknown type: "+schema.stringval(schm));
+
+ def readmap(self, schm, decoder):
+ result = dict()
+ size = decoder.readlong()
+ if size != 0:
+ for i in range(0, size):
+ key = decoder.readutf8()
+ result[key] = self.readdata(schm.getvaluetype(), decoder)
+ decoder.readlong()
+ return result
+
+ def readarray(self, schm, decoder):
+ result = list()
+ size = decoder.readlong()
+ if size != 0:
+ for i in range(0, size):
+ result.append(self.readdata(schm.getelementtype(), decoder))
+ decoder.readlong()
+ return result
+
+ def readrecord(self, schm, decoder):
+ result = dict()
+ for field,fieldschema in schm.getfields():
+ result[field] = self.readdata(fieldschema, decoder)
+ return result
+
+ def readenum(self, schm, decoder):
+ index = decoder.readint()
+ return schm.getenumsymbols()[index]
+
+ def readunion(self, schm, decoder):
+ index = int(decoder.readlong())
+ return self.readdata(schm.getelementtypes()[index], decoder)
+
+class DatumWriter(io.DatumWriterBase):
+ """DatumWriter for generic python objects."""
+
+ def __init__(self, schm=None):
+ self.setschema(schm)
+ self.__writefn = {
+ schema.BOOLEAN : lambda schm, datum, encoder:
+ encoder.writeboolean(datum),
+ schema.STRING : lambda schm, datum, encoder:
+ encoder.writeutf8(datum),
+ schema.INT : lambda schm, datum, encoder:
+ encoder.writeint(datum),
+ schema.LONG : lambda schm, datum, encoder:
+ encoder.writelong(datum),
+ schema.FLOAT : lambda schm, datum, encoder:
+ encoder.writefloat(datum),
+ schema.DOUBLE : lambda schm, datum, encoder:
+ encoder.writedouble(datum),
+ schema.BYTES : lambda schm, datum, encoder:
+ encoder.writebytes(datum),
+ schema.FIXED : lambda schm, datum, encoder:
+ encoder.write(datum),
+ schema.ARRAY : self.writearray,
+ schema.MAP : self.writemap,
+ schema.RECORD : self.writerecord,
+ schema.ENUM : self.writeenum,
+ schema.UNION : self.writeunion
+ }
+
+ def setschema(self, schm):
+ self.__schm = schm
+
+ def write(self, datum, encoder):
+ self.writedata(self.__schm, datum, encoder)
+
+ def writedata(self, schm, datum, encoder):
+ if schm.gettype() == schema.NULL:
+ if datum is None:
+ return
+ raise io.AvroTypeException(schm, datum)
+ fn = self.__writefn.get(schm.gettype())
+ if fn is not None:
+ fn(schm, datum, encoder)
+ else:
+ raise io.AvroTypeException(schm, datum)
+
+ def writemap(self, schm, datum, encoder):
+ if not isinstance(datum, dict):
+ raise io.AvroTypeException(schm, datum)
+ if len(datum) > 0:
+ encoder.writelong(len(datum))
+ for k,v in datum.items():
+ encoder.writeutf8(k)
+ self.writedata(schm.getvaluetype(), v, encoder)
+ encoder.writelong(0)
+
+ def writearray(self, schm, datum, encoder):
+ if not isinstance(datum, list):
+ raise io.AvroTypeException(schm, datum)
+ if len(datum) > 0:
+ encoder.writelong(len(datum))
+ for item in datum:
+ self.writedata(schm.getelementtype(), item, encoder)
+ encoder.writelong(0)
+
+ def writerecord(self, schm, datum, encoder):
+ if not isinstance(datum, dict):
+ raise io.AvroTypeException(schm, datum)
+ for field,fieldschema in schm.getfields():
+ self.writedata(fieldschema, datum.get(field), encoder)
+
+ def writeunion(self, schm, datum, encoder):
+ index = self.resolveunion(schm, datum)
+ encoder.writelong(index)
+ self.writedata(schm.getelementtypes()[index], datum, encoder)
+
+ def writeenum(self, schm, datum, encoder):
+ index = schm.getenumordinal(datum)
+ encoder.writeint(index)
+
+ def resolveunion(self, schm, datum):
+ index = 0
+ for elemtype in schm.getelementtypes():
+ if validate(elemtype, datum):
+ return index
+ index+=1
+ raise io.AvroTypeException(schm, datum)
\ No newline at end of file
Added: hadoop/avro/trunk/src/py/avro/genericipc.py
URL:
http://svn.apache.org/viewvc/hadoop/avro/trunk/src/py/avro/genericipc.py?rev=788453&view=auto
==============================================================================
--- hadoop/avro/trunk/src/py/avro/genericipc.py (added)
+++ hadoop/avro/trunk/src/py/avro/genericipc.py Thu Jun 25 18:12:00 2009
@@ -0,0 +1,57 @@
+#Licensed to the Apache Software Foundation (ASF) under one
+#or more contributor license agreements. See the NOTICE file
+#distributed with this work for additional information
+#regarding copyright ownership. The ASF licenses this file
+#to you under the Apache License, Version 2.0 (the
+#"License"); you may not use this file except in compliance
+#with the License. You may obtain a copy of the License at
+#
+#http://www.apache.org/licenses/LICENSE-2.0
+#
+#Unless required by applicable law or agreed to in writing, software
+#distributed under the License is distributed on an "AS IS" BASIS,
+#WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#See the License for the specific language governing permissions and
+#limitations under the License.
+
+""" Uses genericio to write and read data objects."""
+
+import avro.schema as schema
+import avro.genericio as genericio
+import avro.ipc as ipc
+
+class Requestor(ipc.RequestorBase):
+ """Requestor implementation for generic python data."""
+
+ def getdatumwriter(self, schm):
+ return genericio.DatumWriter(schm)
+
+ def getdatumreader(self, schm):
+ return genericio.DatumReader(schm)
+
+ def writerequest(self, schm, req, encoder):
+ self.getdatumwriter(schm).write(req, encoder)
+
+ def readresponse(self, schm, decoder):
+ return self.getdatumreader(schm).read(decoder)
+
+ def readerror(self, schm, decoder):
+ return ipc.AvroRemoteException(self.getdatumreader(schm).read(decoder))
+
+class Responder(ipc.ResponderBase):
+ """Responder implementation for generic python data."""
+
+ def getdatumwriter(self, schm):
+ return genericio.DatumWriter(schm)
+
+ def getdatumreader(self, schm):
+ return genericio.DatumReader(schm)
+
+ def readrequest(self, schm, decoder):
+ return self.getdatumreader(schm).read(decoder)
+
+ def writeresponse(self, schm, response, encoder):
+ self.getdatumwriter(schm).write(response, encoder)
+
+ def writeerror(self, schm, error, encoder):
+ self.getdatumwriter(schm).write(error.getvalue(), encoder)
Modified: hadoop/avro/trunk/src/py/avro/io.py
URL:
http://svn.apache.org/viewvc/hadoop/avro/trunk/src/py/avro/io.py?rev=788453&r1=788452&r2=788453&view=diff
==============================================================================
--- hadoop/avro/trunk/src/py/avro/io.py (original)
+++ hadoop/avro/trunk/src/py/avro/io.py Thu Jun 25 18:12:00 2009
@@ -42,7 +42,7 @@
def setschema(self, schema):
pass
- def read(self, valuereader):
+ def read(self, decoder):
"""Read a datum. Traverse the schema, depth-first, reading all leaf values
in the schema into a datum that is returned"""
pass
@@ -52,13 +52,13 @@
def setschema(self, schema):
pass
- def write(self, data, valuewriter):
+ def write(self, data, encoder):
"""Write a datum. Traverse the schema, depth first, writing each leaf value
in the schema from the datum to the output."""
pass
-class ValueReader(object):
+class Decoder(object):
"""Read leaf values."""
def __init__(self, reader):
@@ -108,7 +108,7 @@
def read(self, len):
return struct.unpack(len.__str__()+'s', self.__reader.read(len))[0]
-class ValueWriter(object):
+class Encoder(object):
"""Write leaf values."""
def __init__(self, writer):
@@ -189,13 +189,13 @@
def __init__(self, schm, writer, dwriter):
self.__writer = writer
- self.__vwriter = ValueWriter(writer)
+ self.__encoder = Encoder(writer)
self.__dwriter = dwriter
self.__dwriter.setschema(schm)
self.__count = 0 #entries in file
self.__blockcount = 0 #entries in current block
self.__buffer = cStringIO.StringIO()
- self.__bufwriter = ValueWriter(self.__buffer)
+ self.__bufwriter = Encoder(self.__buffer)
self.__meta = dict()
self.__sync = uuid.uuid4().bytes
self.__meta["sync"] = self.__sync
@@ -218,7 +218,7 @@
def __writeblock(self):
if self.__blockcount > 0:
self.__writer.write(self.__sync)
- self.__vwriter.writelong(self.__blockcount)
+ self.__encoder.writelong(self.__blockcount)
self.__writer.write(self.__buffer.getvalue())
self.__buffer.truncate(0) #reset
self.__blockcount = 0
@@ -250,8 +250,8 @@
self.__bufwriter.writebytes(str(v))
size = self.__buffer.tell() + 4
self.__writer.write(self.__sync)
- self.__vwriter.writelong(_FOOTER_BLOCK)
- self.__vwriter.writelong(size)
+ self.__encoder.writelong(_FOOTER_BLOCK)
+ self.__encoder.writelong(size)
self.__buffer.flush()
self.__writer.write(self.__buffer.getvalue())
self.__buffer.truncate(0) #reset
@@ -265,7 +265,7 @@
def __init__(self, reader, dreader):
self.__reader = reader
- self.__vreader = ValueReader(reader)
+ self.__decoder = Decoder(reader)
mag = struct.unpack(len(_MAGIC).__str__()+'s',
self.__reader.read(len(_MAGIC)))[0]
if mag != _MAGIC:
@@ -279,11 +279,11 @@
int(ord(self.__reader.read(1)) << 8) +
int(ord(self.__reader.read(1))))
seekpos = self.__reader.seek(self.__length-footersize)
- metalength = self.__vreader.readlong()
+ metalength = self.__decoder.readlong()
self.__meta = dict()
for i in range(0, metalength):
- key = self.__vreader.readutf8()
- self.__meta[key] = self.__vreader.readbytes()
+ key = self.__decoder.readutf8()
+ self.__meta[key] = self.__decoder.readbytes()
self.__sync = self.__meta.get("sync")
self.__count = int(self.__meta.get("count"))
self.__schema = schema.parse(self.__meta.get("schema").encode("utf-8"))
@@ -302,12 +302,12 @@
if self.__reader.tell() == self.__length:
return None
self.__skipsync()
- self.__blockcount = self.__vreader.readlong()
+ self.__blockcount = self.__decoder.readlong()
if self.__blockcount == _FOOTER_BLOCK:
- self.__reader.seek(self.__vreader.readlong()+self.__reader.tell())
+ self.__reader.seek(self.__decoder.readlong()+self.__reader.tell())
self.__blockcount = 0
self.__blockcount-=1
- datum = self.__dreader.read(self.__vreader)
+ datum = self.__dreader.read(self.__decoder)
return datum
def __skipsync(self):
Modified: hadoop/avro/trunk/src/py/avro/ipc.py
URL:
http://svn.apache.org/viewvc/hadoop/avro/trunk/src/py/avro/ipc.py?rev=788453&r1=788452&r2=788453&view=diff
==============================================================================
--- hadoop/avro/trunk/src/py/avro/ipc.py (original)
+++ hadoop/avro/trunk/src/py/avro/ipc.py Thu Jun 25 18:12:00 2009
@@ -16,14 +16,18 @@
"""Support for inter-process calls."""
-import socket, struct, errno, struct, cStringIO, threading
+import socket, struct, errno, struct, cStringIO, threading, weakref, os
import avro.schema as schema
import avro.protocol as protocol
import avro.io as io
+import avro.reflectio as reflectio
class TransceiverBase(object):
"""Base class for transmitters and receivers of raw binary messages."""
+ def getremotename(self):
+ pass
+
def transceive(self, request):
self.writebuffers(request)
return self.readbuffers()
@@ -51,13 +55,39 @@
class ConnectionClosedException(Exception):
pass
+_PKGNAME = "org.apache.avro.ipc."
+_HANDSHAKE_FILE_DIR = os.path.dirname(__file__).__str__() + os.path.sep
+_HANDSHAKE_REQUEST_SCHEMA = schema.parse(
+ open(_HANDSHAKE_FILE_DIR +
+ "HandshakeRequest.avsc").read())
+_HANDSHAKE_RESPONSE_SCHEMA = schema.parse(
+ open(_HANDSHAKE_FILE_DIR +
+ "HandshakeResponse.avsc").read())
+_HANDSHAKE_REQUESTOR_WRITER = reflectio.ReflectDatumWriter(_PKGNAME,
+ _HANDSHAKE_REQUEST_SCHEMA)
+_HANDSHAKE_REQUESTOR_READER = reflectio.ReflectDatumReader(_PKGNAME,
+ _HANDSHAKE_RESPONSE_SCHEMA)
+_HANDSHAKE_RESPONDER_WRITER = reflectio.ReflectDatumWriter(_PKGNAME,
+ _HANDSHAKE_RESPONSE_SCHEMA)
+_HANDSHAKE_RESPONDER_READER = reflectio.ReflectDatumReader(_PKGNAME,
+ _HANDSHAKE_REQUEST_SCHEMA)
+_HandshakeRequest = reflectio.gettype(_HANDSHAKE_REQUEST_SCHEMA, _PKGNAME)
+_HandshakeResponse = reflectio.gettype(_HANDSHAKE_RESPONSE_SCHEMA, _PKGNAME)
+_HANDSHAKE_MATCH_BOTH = "BOTH"
+_HANDSHAKE_MATCH_CLIENT = "CLIENT"
+_HANDSHAKE_MATCH_NONE = "NONE"
+_REMOTE_HASHES = dict()
+_REMOTE_PROTOCOLS = dict()
+
class RequestorBase(object):
"""Base class for the client side of a protocol interaction."""
def __init__(self, localproto, transceiver):
self.__localproto = localproto
self.__transceiver = transceiver
- self.__remoteproto = self.handshake()
+ self.__established = False
+ self.__sendlocaltext = False
+ self.__remoteproto = None
def getlocal(self):
return self.__localproto
@@ -68,46 +98,80 @@
def gettransceiver(self):
return self.__transceiver
- def handshake(self):
- buf = cStringIO.StringIO()
- vwriter = io.ValueWriter(buf)
- vwriter.writelong(protocol.VERSION)
- vwriter.writeutf8(unicode(self.__localproto.__str__(), 'utf8'))
- response = self.__transceiver.transceive(buf.getvalue())
- vreader = io.ValueReader(cStringIO.StringIO(response))
- remote = vreader.readutf8()
- return protocol.parse(remote)
-
- def call(self, msgname, req):
+ def request(self, msgname, req):
"""Writes a request message and reads a response or error message."""
- m = self.__localproto.getmessages().get(msgname)
+ processed = False
+ while not self.__established or not processed:
+ processed = True
+ buf = cStringIO.StringIO()
+ encoder = io.Encoder(buf)
+ if not self.__established:
+ self.__writehandshake(encoder)
+ m = self.__localproto.getmessages().get(msgname)
+ if m is None:
+ raise schema.AvroException("Not a local message: "+msgname.__str__())
+ encoder.writeutf8(m.getname())
+ self.writerequest(m.getrequest(), req, encoder)
+ response = self.__transceiver.transceive(buf.getvalue())
+ decoder = io.Decoder(cStringIO.StringIO(response))
+ if not self.__established:
+ self.__readhandshake(decoder)
+ m = self.getremote().getmessages().get(msgname)
if m is None:
- raise schema.AvroException("Not a local message: "+msgname.__str__())
- remotemsg = self.__remoteproto.getmessages().get(msgname)
- if remotemsg is None:
raise schema.AvroException("Not a remote message: "+msgname.__str__())
- writer = cStringIO.StringIO()
- vwriter = io.ValueWriter(writer)
- vwriter.writeutf8(m.getname())
-
- self.writerequest(m.getrequest(), req, vwriter)
- response = self.__transceiver.transceive(writer.getvalue())
- vreader = io.ValueReader(cStringIO.StringIO(response))
- self.__remoteproto.getmessages().get(msgname)
- if not vreader.readboolean():
- return self.readresponse(remotemsg.getresponse(), vreader)
+ if not decoder.readboolean():
+ return self.readresponse(m.getresponse(), decoder)
+ else:
+ raise self.readerror(m.geterrors(), decoder)
+
+ def __writehandshake(self, encoder):
+ localhash = self.__localproto.getMD5()
+ remotename = self.__transceiver.getremotename()
+ remotehash = _REMOTE_HASHES.get(remotename)
+ self.__remoteproto = _REMOTE_PROTOCOLS.get(remotehash)
+ if remotehash is None:
+ remotehash = localhash
+ self.__remoteproto = self.__localproto
+ handshake = _HandshakeRequest()
+ handshake.clientHash = localhash
+ handshake.serverHash = remotehash
+ if self.__sendlocaltext:
+ handshake.clientProtocol = unicode(self.__localproto.__str__(), 'utf8')
+ _HANDSHAKE_REQUESTOR_WRITER.write(handshake, encoder)
+
+ def __readhandshake(self, decoder):
+ handshake = _HANDSHAKE_REQUESTOR_READER.read(decoder)
+ print ("Handshake.match of protocol:" +
+ self.__localproto.getname().__str__()+" with:"+
+ self.__transceiver.getremotename().__str__() + " is " +
+ handshake.match.__str__())
+ if handshake.match == _HANDSHAKE_MATCH_BOTH:
+ self.__established = True
+ elif handshake.match == _HANDSHAKE_MATCH_CLIENT:
+ self.__setremote(handshake)
+ self.__established = True
+ elif handshake.match == _HANDSHAKE_MATCH_NONE:
+ self.__setremote(handshake)
+ self.__sendlocaltext = True
else:
- raise self.readerror(remotemsg.geterrors(), vreader)
+ raise schema.AvroException("Unexpected match:
"+handshake.match.__str__())
+
+ def __setremote(self, handshake):
+ self.__remoteproto = protocol.parse(handshake.serverProtocol.__str__())
+ remotehash = handshake.serverHash
+ _REMOTE_HASHES[self.__transceiver.getremotename()] = remotehash
+ if not _REMOTE_PROTOCOLS.has_key(remotehash):
+ _REMOTE_PROTOCOLS[remotehash] = self.__remoteproto
- def writerequest(self, schm, req, vwriter):
+ def writerequest(self, schm, req, encoder):
"""Writes a request message."""
pass
- def readresponse(self, schm, vreader):
+ def readresponse(self, schm, decoder):
"""Reads a response message."""
pass
- def readerror(self, schm, vreader):
+ def readerror(self, schm, decoder):
"""Reads an error message."""
pass
@@ -116,74 +180,102 @@
def __init__(self, localproto):
self.__localproto = localproto
+ self.__remotes = weakref.WeakKeyDictionary()
+ self.__protocols = dict()
+ self.__localhash = self.__localproto.getMD5()
+ self.__protocols[self.__localhash] = self.__localproto
def getlocal(self):
return self.__localproto
- def handshake(self, server):
- """Returns the remote protocol."""
- vreader = io.ValueReader(cStringIO.StringIO(server.readbuffers()))
- out = cStringIO.StringIO()
- vwriter = io.ValueWriter(out)
- version = vreader.readlong()
- if version != protocol.VERSION:
- raise schema.AvroException("Incompatible request version: "
- +version.__str__())
- proto = vreader.readutf8()
- remote = protocol.parse(proto)
- vwriter.writeutf8(unicode(self.__localproto.__str__(), 'utf8'))
- server.writebuffers(out.getvalue())
-
- return remote
-
- def call(self, remoteproto, input):
+ def respond(self, transceiver):
+ """Called by a server to deserialize a request, compute and serialize
+ * a response or error."""
+ transreq = transceiver.readbuffers()
+ reader = cStringIO.StringIO(transreq)
+ decoder = io.Decoder(reader)
buf = cStringIO.StringIO()
- vwriter = io.ValueWriter(buf)
+ encoder = io.Encoder(buf)
+ error = None
+
try:
- reader = cStringIO.StringIO(input)
- vreader = io.ValueReader(reader)
- msgname = vreader.readutf8()
+ remoteproto = self.__handshake(transceiver, decoder, encoder)
+ if remoteproto is None: #handshake failed
+ return buf.getvalue()
+
+ #read request using remote protocol specification
+ msgname = decoder.readutf8()
m = remoteproto.getmessages().get(msgname)
if m is None:
raise schema.AvroException("No such remote message:
"+msgname.__str__())
+ req = self.readrequest(m.getrequest(), decoder)
- req = self.readrequest(m.getrequest(), vreader)
+ #read response using local protocol specification
m = self.__localproto.getmessages().get(msgname)
if m is None:
raise schema.AvroException("No such local message: "+msgname.__str__())
- error = None
try:
response = self.invoke(m, req)
except AvroRemoteException, e:
error = e
except Exception, e:
error = AvroRemoteException(unicode(e.__str__()))
- vwriter.writeboolean(error is not None)
+ encoder.writeboolean(error is not None)
if error is None:
- self.writeresponse(m.getresponse(), response, vwriter)
+ self.writeresponse(m.getresponse(), response, encoder)
else:
- self.writeerror(m.geterrors(), error, vwriter)
+ self.writeerror(m.geterrors(), error, encoder)
except schema.AvroException, e:
error = AvroRemoteException(unicode(e.__str__()))
buf = cStringIO.StringIO()
- vwriter = io.ValueWriter(buf)
- vwriter.writeboolean(True)
- self.writeerror(protocol._SYSTEM_ERRORS, error, vwriter)
-
+ encoder = io.Encoder(buf)
+ encoder.writeboolean(True)
+ self.writeerror(protocol._SYSTEM_ERRORS, error, encoder)
+
return buf.getvalue()
+
+ def __handshake(self, transceiver, decoder, encoder):
+ remoteproto = self.__remotes.get(transceiver)
+ if remoteproto != None:
+ return remoteproto #already established
+ request = _HANDSHAKE_RESPONDER_READER.read(decoder)
+ remoteproto = self.__protocols.get(request.clientHash)
+ if remoteproto is None and request.clientProtocol is not None:
+ remoteproto = protocol.parse(request.clientProtocol)
+ self.__protocols[request.clientHash] = remoteproto
+ if remoteproto is not None:
+ self.__remotes[transceiver] = remoteproto
+ response = _HandshakeResponse()
+
+ if self.__localhash == request.serverHash:
+ if remoteproto is None:
+ response.match = _HANDSHAKE_MATCH_NONE
+ else:
+ response.match = _HANDSHAKE_MATCH_BOTH
+ else:
+ if remoteproto is None:
+ response.match = _HANDSHAKE_MATCH_NONE
+ else:
+ response.match = _HANDSHAKE_MATCH_CLIENT
+ if response.match != _HANDSHAKE_MATCH_BOTH:
+ response.serverProtocol = unicode(self.__localproto.__str__(), "utf8")
+ response.serverHash = self.__localhash
+ _HANDSHAKE_RESPONDER_WRITER.write(response, encoder)
+ return remoteproto
+
def invoke(self, msg, req):
pass
- def readrequest(self, schm, vreader):
+ def readrequest(self, schm, decoder):
"""Reads a request message."""
pass
- def writeresponse(self, schm, response, vwriter):
+ def writeresponse(self, schm, response, encoder):
"""Writes a response message."""
pass
- def writeerror(self, schm, error, vwriter):
+ def writeerror(self, schm, error, encoder):
"""Writes an error message."""
pass
@@ -195,6 +287,9 @@
def __init__(self, sock):
self.__sock = sock
+ def getremotename(self):
+ return self.__sock.getsockname()
+
def readbuffers(self):
msg = cStringIO.StringIO()
while True:
@@ -293,13 +388,13 @@
def __run(self):
try:
- remoteproto = self.__responder.handshake(self)
- while True:
- buf = self.readbuffers()
- buf = self.__responder.call(remoteproto, buf)
- self.writebuffers(buf)
- except ConnectionClosedException, e:
- print "Closed:", self.__thread.getName()
- return
- finally:
- self.close()
+ try:
+ while True:
+ self.writebuffers(self.__responder.respond(self))
+ except ConnectionClosedException, e:
+ print "Closed:", self.__thread.getName()
+ return
+ finally:
+ self.close()
+ except Exception, ex:
+ print "Unexpected error", ex
Modified: hadoop/avro/trunk/src/py/avro/protocol.py
URL:
http://svn.apache.org/viewvc/hadoop/avro/trunk/src/py/avro/protocol.py?rev=788453&r1=788452&r2=788453&view=diff
==============================================================================
--- hadoop/avro/trunk/src/py/avro/protocol.py (original)
+++ hadoop/avro/trunk/src/py/avro/protocol.py Thu Jun 25 18:12:00 2009
@@ -14,7 +14,7 @@
#See the License for the specific language governing permissions and
#limitations under the License.
-import cStringIO
+import cStringIO, md5
import simplejson
import avro.schema as schema
@@ -31,6 +31,7 @@
self.__messages = dict()
self.__name = name
self.__namespace = namespace
+ self.__md5 = None
def getname(self):
return self.__name
@@ -44,6 +45,12 @@
def getmessages(self):
return self.__messages
+ def getMD5(self):
+ """Return the MD5 hash of the text of this protocol."""
+ if self.__md5 is None:
+ self.__md5 = md5.new(self.__str__()).digest()
+ return self.__md5
+
class Message(object):
""" A Protocol message."""
def __init__(self, proto, name, request, response, errors):
Added: hadoop/avro/trunk/src/py/avro/reflectio.py
URL:
http://svn.apache.org/viewvc/hadoop/avro/trunk/src/py/avro/reflectio.py?rev=788453&view=auto
==============================================================================
--- hadoop/avro/trunk/src/py/avro/reflectio.py (added)
+++ hadoop/avro/trunk/src/py/avro/reflectio.py Thu Jun 25 18:12:00 2009
@@ -0,0 +1,133 @@
+#Licensed to the Apache Software Foundation (ASF) under one
+#or more contributor license agreements. See the NOTICE file
+#distributed with this work for additional information
+#regarding copyright ownership. The ASF licenses this file
+#to you under the Apache License, Version 2.0 (the
+#"License"); you may not use this file except in compliance
+#with the License. You may obtain a copy of the License at
+#
+#http://www.apache.org/licenses/LICENSE-2.0
+#
+#Unless required by applicable law or agreed to in writing, software
+#distributed under the License is distributed on an "AS IS" BASIS,
+#WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#See the License for the specific language governing permissions and
+#limitations under the License.
+
+"""Define Record schema and protocol classes at runtime. This can be used
+to invoke the methods on Protocol proxy directly."""
+
+import avro.schema as schema
+import avro.io as io
+import avro.genericio as genericio
+
+#TODO pkgname should not be passed, instead classes should be constructed
+#based on schema namespace
+
+def _validatearray(schm, pkgname, object):
+ if not isinstance(object, list):
+ return False
+ for elem in object:
+ if not validate(schm.getelementtype(), pkgname, elem):
+ return False
+ return True
+
+def _validatemap(schm, pkgname, object):
+ if not isinstance(object, dict):
+ return False
+ for k,v in object.items():
+ if not validate(schm.getvaluetype(), pkgname, v):
+ return False
+ return True
+
+def _validaterecord(schm, pkgname, object):
+ if not isinstance(object, gettype(schm, pkgname)):
+ return False
+ for field,fieldschema in schm.getfields():
+ data = object.__getattribute__(field)
+ if not validate(fieldschema, pkgname, data):
+ return False
+ return True
+
+def _validateunion(schm, pkgname, object):
+ for elemtype in schm.getelementtypes():
+ if validate(elemtype, pkgname, object):
+ return True
+ return False
+
+_validatefn = {
+ schema.NULL : lambda schm, pkgname, object: object is None,
+ schema.BOOLEAN : lambda schm, pkgname, object: isinstance(object, bool),
+ schema.STRING : lambda schm, pkgname, object: isinstance(object, unicode),
+ schema.FLOAT : lambda schm, pkgname, object: isinstance(object, float),
+ schema.DOUBLE : lambda schm, pkgname, object: isinstance(object, float),
+ schema.BYTES : lambda schm, pkgname, object: isinstance(object, str),
+ schema.INT : lambda schm, pkgname, object: ((isinstance(object, long) or
+ isinstance(object, int)) and
+ io._INT_MIN_VALUE <= object <=
io._INT_MAX_VALUE),
+ schema.LONG : lambda schm, pkgname, object: ((isinstance(object, long) or
+ isinstance(object, int)) and
+ io._LONG_MIN_VALUE <= object <=
io._LONG_MAX_VALUE),
+ schema.ENUM : lambda schm, pkgname, object:
+ schm.getenumsymbols().__contains__(object),
+ schema.FIXED : lambda schm, pkgname, object:
+ (isinstance(object, str) and
+ len(object) == schm.getsize()),
+ schema.ARRAY : _validatearray,
+ schema.MAP : _validatemap,
+ schema.RECORD : _validaterecord,
+ schema.UNION : _validateunion
+ }
+
+def validate(schm, pkgname, object):
+ """Returns True if a python datum matches a schema."""
+ fn = _validatefn.get(schm.gettype())
+ if fn is not None:
+ return fn(schm, pkgname, object)
+ else:
+ return False
+
+def gettype(recordschm, pkgname, base=object):
+ """Returns the type with classname as recordschm name and pkg name as
pkgname.
+ If type does not exist creates a new type."""
+ clazzname = pkgname + recordschm.getname()
+ clazz = globals().get(clazzname)
+ if clazz is None:
+ clazz = type(str(clazzname),(base,),{})
+ for field,fieldschema in recordschm.getfields():
+ setattr(clazz, field, None)
+ globals()[clazzname] = clazz
+ return clazz
+
+class ReflectDatumReader(genericio.DatumReader):
+ """DatumReader for arbitrary python classes."""
+
+ def __init__(self, pkgname, schm=None):
+ genericio.DatumReader.__init__(self, schm)
+ self.__pkgname = pkgname
+
+ def readrecord(self, schm, decoder):
+ type = gettype(schm, self.__pkgname)
+ result = type()
+ for field,fieldschema in schm.getfields():
+ setattr(result, field, self.readdata(fieldschema, decoder))
+ return result
+
+class ReflectDatumWriter(genericio.DatumWriter):
+ """DatumWriter for arbitrary python classes."""
+
+ def __init__(self, pkgname, schm=None):
+ genericio.DatumWriter.__init__(self, schm)
+ self.__pkgname = pkgname
+
+ def writerecord(self, schm, datum, encoder):
+ for field,fieldschema in schm.getfields():
+ self.writedata(fieldschema, getattr(datum, field), encoder)
+
+ def resolveunion(self, schm, datum):
+ index = 0
+ for elemtype in schm.getelementtypes():
+ if validate(elemtype, self.__pkgname, datum):
+ return index
+ index+=1
+ raise io.AvroTypeException(schm, datum)
Added: hadoop/avro/trunk/src/py/avro/reflectipc.py
URL:
http://svn.apache.org/viewvc/hadoop/avro/trunk/src/py/avro/reflectipc.py?rev=788453&view=auto
==============================================================================
--- hadoop/avro/trunk/src/py/avro/reflectipc.py (added)
+++ hadoop/avro/trunk/src/py/avro/reflectipc.py Thu Jun 25 18:12:00 2009
@@ -0,0 +1,107 @@
+#Licensed to the Apache Software Foundation (ASF) under one
+#or more contributor license agreements. See the NOTICE file
+#distributed with this work for additional information
+#regarding copyright ownership. The ASF licenses this file
+#to you under the Apache License, Version 2.0 (the
+#"License"); you may not use this file except in compliance
+#with the License. You may obtain a copy of the License at
+#
+#http://www.apache.org/licenses/LICENSE-2.0
+#
+#Unless required by applicable law or agreed to in writing, software
+#distributed under the License is distributed on an "AS IS" BASIS,
+#WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#See the License for the specific language governing permissions and
+#limitations under the License.
+
+""" Uses reflectio to write and read data objects. Provides support for remote
+method invocation via protocol's proxy instance."""
+
+import avro.schema as schema
+import avro.ipc as ipc
+import avro.genericipc as genericipc
+import avro.reflectio as reflectio
+
+#TODO pkgname should not be passed, instead classes should be constructed
+#based on schema namespace
+
+class _Proxy(object):
+
+ class _MethodInvoker(object):
+
+ def __init__(self, requestor, methodname):
+ self.requestor = requestor
+ self.methodname = methodname
+
+ def __call__(self, *args):
+ return self.requestor.request(self.methodname, args)
+
+ def __init__(self, requestor):
+ self.requestor = requestor
+ self.invokers = dict()
+ msgs = self.requestor.getlocal().getmessages()
+ for methodname, method in msgs.items():
+ self.invokers[methodname] = self._MethodInvoker(
+ self.requestor, methodname)
+
+ def __getattribute__(self, attr):
+ attrhandle = object.__getattribute__(self, "invokers").get(attr)
+ if attrhandle is None:
+ attrhandle = object.__getattribute__(self, attr)
+ return attrhandle
+
+def getclient(protocol, transceiver):
+ """Create a proxy instance whose methods invoke RPCs."""
+ requestor = ReflectRequestor(protocol, transceiver)
+ return _Proxy(requestor)
+
+class ReflectRequestor(genericipc.Requestor):
+
+ def __init__(self, localproto, transceiver):
+ ipc.RequestorBase.__init__(self, localproto, transceiver)
+ self.__pkgname = localproto.getnamespace() + "."
+
+ def getdatumwriter(self, schm):
+ return reflectio.ReflectDatumWriter(self.__pkgname, schm)
+
+ def getdatumreader(self, schm):
+ return reflectio.ReflectDatumReader(self.__pkgname, schm)
+
+ def writerequest(self, schm, req, encoder):
+ index = 0
+ for arg in req:
+ argschm = schm.getfields()[index][1]
+ genericipc.Requestor.writerequest(self, argschm, arg, encoder)
+
+ def readerror(self, schm, decoder):
+ return self.getdatumreader(schm).read(decoder)
+
+class ReflectResponder(genericipc.Responder):
+
+ def __init__(self, localproto, impl):
+ genericipc.Responder.__init__(self, localproto)
+ self.__pkgname = localproto.getnamespace() + "."
+ self.__impl = impl
+
+ def getdatumwriter(self, schm):
+ return reflectio.ReflectDatumWriter(self.__pkgname, schm)
+
+ def getdatumreader(self, schm):
+ return reflectio.ReflectDatumReader(self.__pkgname, schm)
+
+ def readrequest(self, schm, decoder):
+ req = list()
+ for field, fieldschm in schm.getfields():
+ req.append(genericipc.Responder.readrequest(self, fieldschm, decoder))
+ return req
+
+ def writeerror(self, schm, error, encoder):
+ self.getdatumwriter(schm).write(error, encoder)
+
+ def invoke(self, msg, req):
+ method = self.__impl.__getattribute__(msg.getname())
+ if method is None:
+ raise AttributeError("No method with name "+ method)
+ resp = method(*req)
+ return resp
+
Modified: hadoop/avro/trunk/src/test/py/interoptests.py
URL:
http://svn.apache.org/viewvc/hadoop/avro/trunk/src/test/py/interoptests.py?rev=788453&r1=788452&r2=788453&view=diff
==============================================================================
--- hadoop/avro/trunk/src/test/py/interoptests.py (original)
+++ hadoop/avro/trunk/src/test/py/interoptests.py Thu Jun 25 18:12:00 2009
@@ -18,8 +18,9 @@
import avro.schema as schema
import avro.io as io
import avro.ipc as ipc
-import avro.generic as generic
-import avro.reflect as reflect
+import avro.genericio as genericio
+import avro.reflectio as reflectio
+import avro.reflectipc as reflectipc
import testio, testipc, testioreflect, testipcreflect
_DATAFILE_DIR = "build/test/data-files/"
@@ -27,8 +28,8 @@
class TestGeneratedFiles(unittest.TestCase):
- def __init__(self, methodName, validator=generic.validate,
- datumreader=generic.DatumReader):
+ def __init__(self, methodName, validator=genericio.validate,
+ datumreader=genericio.DatumReader):
unittest.TestCase.__init__(self, methodName)
self.__validator = validator
self.__datumreader = datumreader
@@ -61,7 +62,7 @@
sock.connect(("localhost", int(port)))
client = ipc.SocketTransceiver(sock)
testproto = testipcreflect.TestProtocol("testipc")
- testproto.proxy = reflect.getclient(testipc.PROTOCOL, client)
+ testproto.proxy = reflectipc.getclient(testipc.PROTOCOL, client)
testproto.checkhello()
testproto.checkecho()
testproto.checkechobytes()
@@ -70,7 +71,7 @@
def _interopserver():
addr = ('localhost', 0)
- responder = reflect.ReflectResponder(testipc.PROTOCOL,
+ responder = reflectipc.ReflectResponder(testipc.PROTOCOL,
testipcreflect.TestImpl())
server = ipc.SocketServer(responder, addr)
file = open(_SERVER_PORTS_DIR+"py-port", "w")
Modified: hadoop/avro/trunk/src/test/py/testio.py
URL:
http://svn.apache.org/viewvc/hadoop/avro/trunk/src/test/py/testio.py?rev=788453&r1=788452&r2=788453&view=diff
==============================================================================
--- hadoop/avro/trunk/src/test/py/testio.py (original)
+++ hadoop/avro/trunk/src/test/py/testio.py Thu Jun 25 18:12:00 2009
@@ -17,7 +17,7 @@
import unittest, random, cStringIO, time, sys, os, struct
import avro.schema as schema
import avro.io as io
-import avro.generic as generic
+import avro.genericio as genericio
_DIR = "build/test/"
_FILE = _DIR +"test.py.avro"
@@ -95,9 +95,9 @@
class TestSchema(unittest.TestCase):
- def __init__(self, methodName, validator=generic.validate,
- dwriter=generic.DatumWriter,
- dreader=generic.DatumReader, random=RandomData,
+ def __init__(self, methodName, validator=genericio.validate,
+ dwriter=genericio.DatumWriter,
+ dreader=genericio.DatumReader,
random=RandomData,
assertdata=True):
unittest.TestCase.__init__(self, methodName)
self.__validator = validator
@@ -185,10 +185,10 @@
self.assertTrue(self.__validator(schm, datum))
w = self.__datumwriter(schm)
writer = cStringIO.StringIO()
- w.write(datum, io.ValueWriter(writer))
+ w.write(datum, io.Encoder(writer))
r = self.__datumreader(schm)
reader = cStringIO.StringIO(writer.getvalue())
- ob = r.read(io.ValueReader(reader))
+ ob = r.read(io.Decoder(reader))
if self.__assertdata:
self.assertEquals(datum, ob)
@@ -215,7 +215,7 @@
file = sys.argv[2]
count = int(sys.argv[3])
randomData = RandomData(schm)
- dw = io.DataFileWriter(schm, open(file, 'wb'), generic.DatumWriter())
+ dw = io.DataFileWriter(schm, open(file, 'wb'), genericio.DatumWriter())
for i in range(0,count):
dw.append(randomData.next())
dw.close()
Modified: hadoop/avro/trunk/src/test/py/testioreflect.py
URL:
http://svn.apache.org/viewvc/hadoop/avro/trunk/src/test/py/testioreflect.py?rev=788453&r1=788452&r2=788453&view=diff
==============================================================================
--- hadoop/avro/trunk/src/test/py/testioreflect.py (original)
+++ hadoop/avro/trunk/src/test/py/testioreflect.py Thu Jun 25 18:12:00 2009
@@ -15,21 +15,19 @@
#limitations under the License.
import avro.schema as schema
-import avro.reflect as reflect
-import avro.generic as generic
+import avro.reflectio as reflectio
import testio
_PKGNAME = "org.apache.avro.test."
def dyvalidator(schm, object):
- return reflect.validate(schm, _PKGNAME, object)
+ return reflectio.validate(schm, _PKGNAME, object)
class DyRandomData(testio.RandomData):
def nextdata(self, schm, d=0):
if schm.gettype() == schema.RECORD:
- name = schm.getname()
- clazz = reflect.gettype(name, _PKGNAME)
+ clazz = reflectio.gettype(schm, _PKGNAME)
result = clazz()
for field,fieldschema in schm.getfields():
result.__setattr__(field, self.nextdata(fieldschema,d))
@@ -37,15 +35,15 @@
else:
return testio.RandomData.nextdata(self, schm, d)
-class ReflectDReader(reflect.ReflectDatumReader):
+class ReflectDReader(reflectio.ReflectDatumReader):
def __init__(self, schm=None):
- reflect.ReflectDatumReader.__init__(self, _PKGNAME, schm)
+ reflectio.ReflectDatumReader.__init__(self, _PKGNAME, schm)
-class ReflectDWriter(reflect.ReflectDatumWriter):
+class ReflectDWriter(reflectio.ReflectDatumWriter):
def __init__(self, schm=None):
- reflect.ReflectDatumWriter.__init__(self, _PKGNAME, schm)
+ reflectio.ReflectDatumWriter.__init__(self, _PKGNAME, schm)
class TestSchema(testio.TestSchema):
Modified: hadoop/avro/trunk/src/test/py/testipc.py
URL:
http://svn.apache.org/viewvc/hadoop/avro/trunk/src/test/py/testipc.py?rev=788453&r1=788452&r2=788453&view=diff
==============================================================================
--- hadoop/avro/trunk/src/test/py/testipc.py (original)
+++ hadoop/avro/trunk/src/test/py/testipc.py Thu Jun 25 18:12:00 2009
@@ -17,7 +17,7 @@
import unittest, socket, struct
import testio
import avro.ipc as ipc
-import avro.generic as generic
+import avro.genericipc as genericipc
import avro.protocol as protocol
import avro.schema as schema
@@ -25,7 +25,7 @@
class TestProtocol(unittest.TestCase):
- class TestResponder(generic.Responder):
+ class TestResponder(genericipc.Responder):
def __init__(self):
ipc.ResponderBase.__init__(self, PROTOCOL)
@@ -50,6 +50,8 @@
raise schema.AvroException("unexpected message:",msg.getname());
def testipc(self):
+ self.server = None
+ self.requestor = None
try:
self.checkstartserver()
self.checkhello()
@@ -65,12 +67,12 @@
sock = socket.socket()
sock.connect(self.server.getaddress())
client = ipc.SocketTransceiver(sock)
- self.requestor = generic.Requestor(PROTOCOL, client)
+ self.requestor = genericipc.Requestor(PROTOCOL, client)
def checkhello(self):
params = dict()
params['greeting'] = unicode('bob')
- resp = self.requestor.call('hello', params)
+ resp = self.requestor.request('hello', params)
self.assertEquals('goodbye',resp)
def checkecho(self):
@@ -80,7 +82,7 @@
record['hash'] = struct.pack('16s','0123456789012345')
params = dict()
params['record'] = record
- echoed = self.requestor.call('echo', params)
+ echoed = self.requestor.request('echo', params)
self.assertEquals(record,echoed)
def checkechobytes(self):
@@ -88,17 +90,18 @@
rand = testio.RandomData(schema._BytesSchema())
data = rand.next()
params['data'] = data
- echoed = self.requestor.call('echoBytes', params)
+ echoed = self.requestor.request('echoBytes', params)
self.assertEquals(data,echoed)
def checkerror(self):
error = None
try:
- self.requestor.call("error", dict())
+ self.requestor.request("error", dict())
except ipc.AvroRemoteException, e:
error = e
self.assertNotEquals(error, None)
self.assertEquals("an error", error.getvalue().get("message"))
def checkshutdown(self):
- self.server.close()
+ if self.server is not None:
+ self.server.close()
Modified: hadoop/avro/trunk/src/test/py/testipcreflect.py
URL:
http://svn.apache.org/viewvc/hadoop/avro/trunk/src/test/py/testipcreflect.py?rev=788453&r1=788452&r2=788453&view=diff
==============================================================================
--- hadoop/avro/trunk/src/test/py/testipcreflect.py (original)
+++ hadoop/avro/trunk/src/test/py/testipcreflect.py Thu Jun 25 18:12:00 2009
@@ -16,13 +16,15 @@
import socket, struct
import avro.schema as schema
-import avro.reflect as reflect
+import avro.reflectio as reflectio
+import avro.reflectipc as reflectipc
import avro.ipc as ipc
import testipc, testio, testioreflect
-TestRecord = reflect.gettype("TestRecord", testioreflect._PKGNAME)
-TestError = reflect.gettype("TestError", testioreflect._PKGNAME,
- ipc.AvroRemoteException)
+TestRecord = reflectio.gettype(testipc.PROTOCOL.gettypes().get("TestRecord"),
+ testioreflect._PKGNAME)
+TestError = reflectio.gettype(testipc.PROTOCOL.gettypes().get("TestError"),
+ testioreflect._PKGNAME, ipc.AvroRemoteException)
class TestImpl(object):
@@ -44,12 +46,12 @@
def checkstartserver(self):
addr = ('localhost', 0)
- responder = reflect.ReflectResponder(testipc.PROTOCOL, TestImpl())
+ responder = reflectipc.ReflectResponder(testipc.PROTOCOL, TestImpl())
self.server = ipc.SocketServer(responder, addr)
sock = socket.socket()
sock.connect(self.server.getaddress())
client = ipc.SocketTransceiver(sock)
- self.proxy = reflect.getclient(testipc.PROTOCOL, client)
+ self.proxy = reflectipc.getclient(testipc.PROTOCOL, client)
def checkhello(self):
resp = self.proxy.hello(unicode("bob"))
@@ -77,6 +79,3 @@
error = e
self.assertNotEquals(error, None)
self.assertEquals("an error", error.message)
-
- def checkshutdown(self):
- self.server.close()
\ No newline at end of file