Author: cutting
Date: Fri Jan 8 19:06:53 2010
New Revision: 897302
URL: http://svn.apache.org/viewvc?rev=897302&view=rev
Log:
AVRO-289. Fix Python schema resolution. Contributed by Jeff Hammerbacher.
Modified:
hadoop/avro/trunk/CHANGES.txt
hadoop/avro/trunk/src/py/avro/io.py
hadoop/avro/trunk/src/py/avro/schema.py
hadoop/avro/trunk/src/test/py/test_io.py
Modified: hadoop/avro/trunk/CHANGES.txt
URL:
http://svn.apache.org/viewvc/hadoop/avro/trunk/CHANGES.txt?rev=897302&r1=897301&r2=897302&view=diff
==============================================================================
--- hadoop/avro/trunk/CHANGES.txt (original)
+++ hadoop/avro/trunk/CHANGES.txt Fri Jan 8 19:06:53 2010
@@ -255,6 +255,9 @@
AVRO-292. Fix Python skipping of ints and longs.
(Jeff Hammerbacher via cutting)
+ AVRO-289. Fix Python schema resolution.
+ (Jeff Hammerbacher via cutting)
+
Avro 1.2.0 (14 October 2009)
INCOMPATIBLE CHANGES
Modified: hadoop/avro/trunk/src/py/avro/io.py
URL:
http://svn.apache.org/viewvc/hadoop/avro/trunk/src/py/avro/io.py?rev=897302&r1=897301&r2=897302&view=diff
==============================================================================
--- hadoop/avro/trunk/src/py/avro/io.py (original)
+++ hadoop/avro/trunk/src/py/avro/io.py Fri Jan 8 19:06:53 2010
@@ -65,11 +65,10 @@
% (datum, expected_schema)
schema.AvroException.__init__(self, fail_msg)
-class SchemaMatchException(schema.AvroException):
- """Raised when writer's and reader's schema do not match."""
- def __init__(self, writers_schema, readers_schema):
- fail_msg = "Writer's schema %s and Reader's schema %s do not match."\
- % (writers_schema, readers_schema)
+class SchemaResolutionException(schema.AvroException):
+ def __init__(self, fail_msg, writers_schema=None, readers_schema=None):
+ if writers_schema: fail_msg += "\nWriter's Schema: %s" % writers_schema
+ if readers_schema: fail_msg += "\nReader's Schema: %s" % writers_schema
schema.AvroException.__init__(self, fail_msg)
#
@@ -249,6 +248,10 @@
# read-only properties
writer = property(lambda self: self._writer)
+ def write(self, datum):
+ """Write an abritrary datum."""
+ self.writer.write(datum)
+
def write_null(self, datum):
"""
null is written as zero bytes
@@ -261,9 +264,9 @@
whose value is either 0 (false) or 1 (true).
"""
if datum:
- self.writer.write(chr(1))
+ self.write(chr(1))
else:
- self.writer.write(chr(0))
+ self.write(chr(0))
def write_int(self, datum):
"""
@@ -277,9 +280,9 @@
"""
datum = (datum << 1) ^ (datum >> 63)
while (datum & ~0x7F) != 0:
- self.writer.write(chr((datum & 0x7f) | 0x80))
+ self.write(chr((datum & 0x7f) | 0x80))
datum >>= 7
- self.writer.write(chr(datum))
+ self.write(chr(datum))
def write_float(self, datum):
"""
@@ -288,10 +291,10 @@
Java's floatToIntBits and then encoded in little-endian format.
"""
bits = STRUCT_INT.unpack(STRUCT_FLOAT.pack(datum))[0]
- self.writer.write(chr((bits) & 0xFF))
- self.writer.write(chr((bits >> 8) & 0xFF))
- self.writer.write(chr((bits >> 16) & 0xFF))
- self.writer.write(chr((bits >> 24) & 0xFF))
+ self.write(chr((bits) & 0xFF))
+ self.write(chr((bits >> 8) & 0xFF))
+ self.write(chr((bits >> 16) & 0xFF))
+ self.write(chr((bits >> 24) & 0xFF))
def write_double(self, datum):
"""
@@ -300,21 +303,21 @@
Java's doubleToLongBits and then encoded in little-endian format.
"""
bits = STRUCT_LONG.unpack(STRUCT_DOUBLE.pack(datum))[0]
- self.writer.write(chr((bits) & 0xFF))
- self.writer.write(chr((bits >> 8) & 0xFF))
- self.writer.write(chr((bits >> 16) & 0xFF))
- self.writer.write(chr((bits >> 24) & 0xFF))
- self.writer.write(chr((bits >> 32) & 0xFF))
- self.writer.write(chr((bits >> 40) & 0xFF))
- self.writer.write(chr((bits >> 48) & 0xFF))
- self.writer.write(chr((bits >> 56) & 0xFF))
+ self.write(chr((bits) & 0xFF))
+ self.write(chr((bits >> 8) & 0xFF))
+ self.write(chr((bits >> 16) & 0xFF))
+ self.write(chr((bits >> 24) & 0xFF))
+ self.write(chr((bits >> 32) & 0xFF))
+ self.write(chr((bits >> 40) & 0xFF))
+ self.write(chr((bits >> 48) & 0xFF))
+ self.write(chr((bits >> 56) & 0xFF))
def write_bytes(self, datum):
"""
Bytes are encoded as a long followed by that many bytes of data.
"""
self.write_long(len(datum))
- self.writer.write(struct.pack('%ds' % len(datum), datum))
+ self.write(struct.pack('%ds' % len(datum), datum))
def write_utf8(self, datum):
"""
@@ -324,10 +327,6 @@
datum = datum.encode("utf-8")
self.write_bytes(datum)
- def write(self, datum):
- """Write an abritrary datum."""
- self.writer.write(datum)
-
#
# DatumReader/Writer
#
@@ -407,14 +406,16 @@
def read_data(self, writers_schema, readers_schema, decoder):
# schema matching
if not DatumReader.match_schemas(writers_schema, readers_schema):
- raise SchemaMatchException(writers_schema, readers_schema)
+ fail_msg = 'Schemas do not match.'
+ raise SchemaResolutionException(fail_msg, writers_schema, readers_schema)
# schema resolution: reader's schema is a union, writer's schema is not
if writers_schema.type != 'union' and readers_schema.type == 'union':
for s in readers_schema.schemas:
if DatumReader.match_schemas(writers_schema, s):
return self.read_data(writers_schema, s, decoder)
- raise SchemaMatchException(writers_schema, readers_schema)
+ fail_msg = 'Schemas do not match.'
+ raise SchemaResolutionException(fail_msg, writers_schema, readers_schema)
# function dispatch for reading data based on type of writer's schema
if writers_schema.type == 'null':
@@ -501,10 +502,10 @@
index_of_symbol = decoder.read_int()
read_symbol = writers_schema.symbols[index_of_symbol]
- # TODO(hammer): figure out what "unset" means for resolution
# schema resolution
if read_symbol not in readers_schema.symbols:
- pass # 'unset' here
+ fail_msg = "Symbol %s not present in Reader's Schema" % read_symbol
+ raise SchemaResolutionException(fail_msg, writers_schema, readers_schema)
return read_symbol
@@ -642,11 +643,13 @@
writers_fields_dict = writers_schema.fields_dict
for field_name, field in readers_fields_dict.items():
if not writers_fields_dict.has_key(field_name):
- if field.default is not None:
+ if field.has_default:
field_val = self._read_default_value(field.type, field.default)
read_record[field.name] = field_val
else:
- pass # 'unset' here
+ fail_msg = 'No default value for field %s' % field_name
+ raise SchemaResolutionException(fail_msg, writers_schema,
+ readers_schema)
return read_record
def skip_record(self, writers_schema, decoder):
@@ -657,9 +660,9 @@
"""
Basically a JSON Decoder?
"""
- if field_schema.type in 'null':
+ if field_schema.type == 'null':
return None
- elif field_schema.type in 'boolean':
+ elif field_schema.type == 'boolean':
return bool(default_value)
elif field_schema.type == 'int':
return int(default_value)
Modified: hadoop/avro/trunk/src/py/avro/schema.py
URL:
http://svn.apache.org/viewvc/hadoop/avro/trunk/src/py/avro/schema.py?rev=897302&r1=897301&r2=897302&view=diff
==============================================================================
--- hadoop/avro/trunk/src/py/avro/schema.py (original)
+++ hadoop/avro/trunk/src/py/avro/schema.py Fri Jan 8 19:06:53 2010
@@ -187,10 +187,7 @@
Name.make_fullname(self.name, self.namespace))
class Field(object):
- def __init__(self, type, name, default=None, order=None, names=None):
- self._props = {}
- self._type_from_names = False
-
+ def __init__(self, type, name, has_default, default=None, order=None,
names=None):
# Ensure valid ctor args
if not name:
fail_msg = 'Fields must have a non-empty name.'
@@ -203,6 +200,9 @@
raise SchemaParseException(fail_msg)
# add members
+ self._props = {}
+ self._type_from_names = False
+ self._has_default = has_default
if (isinstance(type, basestring) and names is not None
and names.has_key(type)):
type_schema = names[type]
@@ -216,13 +216,14 @@
self.set_prop('type', type_schema)
self.set_prop('name', name)
# TODO(hammer): check to ensure default is valid
- if default is not None: self.set_prop('default', default)
+ if has_default: self.set_prop('default', default)
if order is not None: self.set_prop('order', order)
# read-only properties
type = property(lambda self: self.get_prop('type'))
name = property(lambda self: self.get_prop('name'))
default = property(lambda self: self.get_prop('default'))
+ has_default = property(lambda self: self._has_default)
order = property(lambda self: self.get_prop('order'))
props = property(lambda self: self._props)
type_from_names = property(lambda self: self._type_from_names)
@@ -463,9 +464,16 @@
if hasattr(field, 'get') and callable(field.get):
type = field.get('type')
name = field.get('name')
- default = field.get('default')
+
+ # null values can have a default value of None
+ has_default = False
+ default = None
+ if field.has_key('default'):
+ has_default = True
+ default = field.get('default')
+
order = field.get('order')
- new_field = Field(type, name, default, order, names)
+ new_field = Field(type, name, has_default, default, order, names)
# make sure field name has not been used yet
if new_field.name in field_names:
fail_msg = 'Field name %s already in use.' % new_field.name
Modified: hadoop/avro/trunk/src/test/py/test_io.py
URL:
http://svn.apache.org/viewvc/hadoop/avro/trunk/src/test/py/test_io.py?rev=897302&r1=897301&r2=897302&view=diff
==============================================================================
--- hadoop/avro/trunk/src/test/py/test_io.py (original)
+++ hadoop/avro/trunk/src/test/py/test_io.py Fri Jan 8 19:06:53 2010
@@ -50,7 +50,7 @@
""", {'value': {'car': {'value': 'head'}, 'cdr': {'value': None}}}),
)
-BINARY_INT_ENCODINGS = (
+BINARY_ENCODINGS = (
(0, '00'),
(-1, '01'),
(1, '02'),
@@ -62,6 +62,37 @@
(-8193, '81 80 01'),
)
+DEFAULT_VALUE_EXAMPLES = (
+ ('"null"', 'null', None),
+ ('"boolean"', 'true', True),
+ ('"string"', '"foo"', u'foo'),
+ ('"bytes"', '"\u00FF\u00FF"', u'\xff\xff'),
+ ('"int"', '5', 5),
+ ('"long"', '5', 5L),
+ ('"float"', '1.1', 1.1),
+ ('"double"', '1.1', 1.1),
+ ('{"type": "fixed", "name": "F", "size": 2}', '"\u00FF\u00FF"', u'\xff\xff'),
+ ('{"type": "enum", "name": "F", "symbols": ["FOO", "BAR"]}', '"FOO"', 'FOO'),
+ ('{"type": "array", "items": "int"}', '[1, 2, 3]', [1, 2, 3]),
+ ('{"type": "map", "values": "int"}', '{"a": 1, "b": 2}', {'a': 1, 'b': 2}),
+ ('["int", "null"]', '5', 5),
+ ('{"type": "record", "name": "F", "fields": [{"name": "A", "type": "int"}]}',
+ '{"A": 5}', {'A': 5}),
+)
+
+LONG_RECORD_SCHEMA = schema.parse("""\
+ {"type": "record",
+ "name": "Test",
+ "fields": [{"name": "A", "type": "int"},
+ {"name": "B", "type": "int"},
+ {"name": "C", "type": "int"},
+ {"name": "D", "type": "int"},
+ {"name": "E", "type": "int"},
+ {"name": "F", "type": "int"},
+ {"name": "G", "type": "int"}]}""")
+
+LONG_RECORD_DATUM = {'A': 1, 'B': 2, 'C': 3, 'D': 4, 'E': 5, 'F': 6, 'G': 7}
+
def avro_hexlify(reader):
"""Return the hex value, as a string, of a binary-encoded int or long."""
bytes = []
@@ -72,222 +103,205 @@
bytes.append(hexlify(current_byte))
return ' '.join(bytes)
+def print_test_name(test_name):
+ print ''
+ print test_name
+ print '=' * len(test_name)
+ print ''
+
+def write_datum(datum, writers_schema):
+ writer = cStringIO.StringIO()
+ encoder = io.BinaryEncoder(writer)
+ datum_writer = io.DatumWriter(writers_schema)
+ datum_writer.write(datum, encoder)
+ return writer, encoder, datum_writer
+
+def read_datum(buffer, writers_schema, readers_schema=None):
+ reader = cStringIO.StringIO(buffer.getvalue())
+ decoder = io.BinaryDecoder(reader)
+ datum_reader = io.DatumReader(writers_schema, readers_schema)
+ return datum_reader.read(decoder)
+
+def check_binary_encoding(number_type):
+ print_test_name('TEST BINARY %s ENCODING' % number_type.upper())
+ correct = 0
+ for datum, hex_encoding in BINARY_ENCODINGS:
+ print 'Datum: %d' % datum
+ print 'Correct Encoding: %s' % hex_encoding
+
+ writers_schema = schema.parse('"%s"' % number_type.lower())
+ writer, encoder, datum_writer = write_datum(datum, writers_schema)
+ writer.seek(0)
+ hex_val = avro_hexlify(writer)
+
+ print 'Read Encoding: %s' % hex_val
+ if hex_encoding == hex_val: correct += 1
+ print ''
+ return correct
+
+def check_skip_number(number_type):
+ print_test_name('TEST SKIP %s' % number_type.upper())
+ correct = 0
+ for value_to_skip, hex_encoding in BINARY_ENCODINGS:
+ VALUE_TO_READ = 6253
+ print 'Value to Skip: %d' % value_to_skip
+
+ # write the value to skip and a known value
+ writers_schema = schema.parse('"%s"' % number_type.lower())
+ writer, encoder, datum_writer = write_datum(value_to_skip, writers_schema)
+ datum_writer.write(VALUE_TO_READ, encoder)
+
+ # skip the value
+ reader = cStringIO.StringIO(writer.getvalue())
+ decoder = io.BinaryDecoder(reader)
+ decoder.skip_long()
+
+ # read data from string buffer
+ datum_reader = io.DatumReader(writers_schema)
+ read_value = datum_reader.read(decoder)
+
+ print 'Read Value: %d' % read_value
+ if read_value == VALUE_TO_READ: correct += 1
+ print ''
+ return correct
+
class TestIO(unittest.TestCase):
+ #
+ # BASIC FUNCTIONALITY
+ #
+
def test_validate(self):
- print ''
- print 'Test Validate'
- print '============='
- print ''
+ print_test_name('TEST VALIDATE')
passed = 0
- for expected_schema, datum in SCHEMAS_TO_VALIDATE:
- print expected_schema, datum
- validated = io.validate(schema.parse(expected_schema), datum)
- print validated
+ for example_schema, datum in SCHEMAS_TO_VALIDATE:
+ print 'Schema: %s' % example_schema
+ print 'Datum: %s' % datum
+ validated = io.validate(schema.parse(example_schema), datum)
+ print 'Valid: %s' % validated
if validated: passed += 1
self.assertEquals(passed, len(SCHEMAS_TO_VALIDATE))
- # TODO(hammer): print bytes in python
- def test_encode(self):
- print ''
- print 'Test Encode'
- print '============='
- print ''
-
- # boolean
- writer = cStringIO.StringIO()
- string_encoder = io.BinaryEncoder(writer)
- string_encoder.write_boolean(True)
- print 'Boolean: ' + repr(writer.getvalue())
-
- # string
- writer = cStringIO.StringIO()
- string_encoder = io.BinaryEncoder(writer)
- string_encoder.write_utf8(unicode('adsfasdf09809dsf-=adsf'))
- print 'String: ' + repr(writer.getvalue())
-
- # int
- writer = cStringIO.StringIO()
- string_encoder = io.BinaryEncoder(writer)
- string_encoder.write_int(1)
- print 'Int: ' + repr(writer.getvalue())
-
- # long
- writer = cStringIO.StringIO()
- string_encoder = io.BinaryEncoder(writer)
- string_encoder.write_long(1)
- print 'Long: ' + repr(writer.getvalue())
-
- # float
- writer = cStringIO.StringIO()
- string_encoder = io.BinaryEncoder(writer)
- string_encoder.write_float(1.0)
- print 'Float: ' + repr(writer.getvalue())
-
- # double
- writer = cStringIO.StringIO()
- string_encoder = io.BinaryEncoder(writer)
- string_encoder.write_double(1.0)
- print 'Double: ' + repr(writer.getvalue())
-
- # bytes
- writer = cStringIO.StringIO()
- string_encoder = io.BinaryEncoder(writer)
- string_encoder.write_bytes('12345abcd')
- print 'Bytes: ' + repr(writer.getvalue())
-
- def test_decode(self):
- pass
-
- def test_datum_reader(self):
- pass
-
- def test_datum_writer(self):
- pass
-
def test_round_trip(self):
- print ''
- print 'TEST ROUND TRIP'
- print '==============='
- print ''
+ print_test_name('TEST ROUND TRIP')
correct = 0
for example_schema, datum in SCHEMAS_TO_VALIDATE:
print 'Schema: %s' % example_schema
print 'Datum: %s' % datum
- print 'Valid: %s' % io.validate(schema.parse(example_schema), datum)
- # write datum in binary to string buffer
- writer = cStringIO.StringIO()
- encoder = io.BinaryEncoder(writer)
- datum_writer = io.DatumWriter(schema.parse(example_schema))
- datum_writer.write(datum, encoder)
-
- # read data from string buffer
- reader = cStringIO.StringIO(writer.getvalue())
- decoder = io.BinaryDecoder(reader)
- datum_reader = io.DatumReader(schema.parse(example_schema))
- round_trip_datum = datum_reader.read(decoder)
+ writers_schema = schema.parse(example_schema)
+ writer, encoder, datum_writer = write_datum(datum, writers_schema)
+ round_trip_datum = read_datum(writer, writers_schema)
print 'Round Trip Datum: %s' % round_trip_datum
if datum == round_trip_datum: correct += 1
- print 'Correct Round Trip: %s' % (datum == round_trip_datum)
- print ''
self.assertEquals(correct, len(SCHEMAS_TO_VALIDATE))
+ #
+ # BINARY ENCODING OF INT AND LONG
+ #
+
def test_binary_int_encoding(self):
- print ''
- print 'TEST BINARY INT ENCODING'
- print '========================'
- print ''
- correct = 0
- for value, hex_encoding in BINARY_INT_ENCODINGS:
- print 'Value: %d' % value
- print 'Correct Encoding: %s' % hex_encoding
-
- # write datum in binary to string buffer
- buffer = cStringIO.StringIO()
- encoder = io.BinaryEncoder(buffer)
- datum_writer = io.DatumWriter(schema.parse('"int"'))
- datum_writer.write(value, encoder)
-
- # read it out of the buffer and hexlify it
- buffer.seek(0)
- hex_val = avro_hexlify(buffer)
-
- # check it
- print 'Read Encoding: %s' % hex_val
- if hex_encoding == hex_val: correct += 1
- print ''
- self.assertEquals(correct, len(BINARY_INT_ENCODINGS))
+ correct = check_binary_encoding('int')
+ self.assertEquals(correct, len(BINARY_ENCODINGS))
def test_binary_long_encoding(self):
- print ''
- print 'TEST BINARY LONG ENCODING'
- print '========================='
- print ''
- correct = 0
- for value, hex_encoding in BINARY_INT_ENCODINGS:
- print 'Value: %d' % value
- print 'Correct Encoding: %s' % hex_encoding
-
- # write datum in binary to string buffer
- buffer = cStringIO.StringIO()
- encoder = io.BinaryEncoder(buffer)
- datum_writer = io.DatumWriter(schema.parse('"long"'))
- datum_writer.write(value, encoder)
-
- # read it out of the buffer and hexlify it
- buffer.seek(0)
- hex_val = avro_hexlify(buffer)
-
- # check it
- print 'Read Encoding: %s' % hex_val
- if hex_encoding == hex_val: correct += 1
- print ''
- self.assertEquals(correct, len(BINARY_INT_ENCODINGS))
+ correct = check_binary_encoding('long')
+ self.assertEquals(correct, len(BINARY_ENCODINGS))
+
+ def test_skip_int(self):
+ correct = check_skip_number('int')
+ self.assertEquals(correct, len(BINARY_ENCODINGS))
def test_skip_long(self):
- print ''
- print 'TEST SKIP LONG'
- print '=============='
- print ''
- correct = 0
- for value_to_skip, hex_encoding in BINARY_INT_ENCODINGS:
- VALUE_TO_READ = 6253
- print 'Value to Skip: %d' % value_to_skip
-
- # write some data in binary to string buffer
- writer = cStringIO.StringIO()
- encoder = io.BinaryEncoder(writer)
- datum_writer = io.DatumWriter(schema.parse('"long"'))
- datum_writer.write(value_to_skip, encoder)
- datum_writer.write(VALUE_TO_READ, encoder)
-
- # skip the value
- reader = cStringIO.StringIO(writer.getvalue())
- decoder = io.BinaryDecoder(reader)
- decoder.skip_long()
-
- # read data from string buffer
- datum_reader = io.DatumReader(schema.parse('"long"'))
- read_value = datum_reader.read(decoder)
-
- # check it
- print 'Read Value: %d' % read_value
- if read_value == VALUE_TO_READ: correct += 1
- print ''
- self.assertEquals(correct, len(BINARY_INT_ENCODINGS))
+ correct = check_skip_number('long')
+ self.assertEquals(correct, len(BINARY_ENCODINGS))
+
+ #
+ # SCHEMA RESOLUTION
+ #
+
+ def test_unknown_symbol(self):
+ print_test_name('TEST UNKNOWN SYMBOL')
+ writers_schema = schema.parse("""\
+ {"type": "enum", "name": "Test",
+ "symbols": ["FOO", "BAR"]}""")
+ datum_to_write = 'FOO'
+
+ readers_schema = schema.parse("""\
+ {"type": "enum", "name": "Test",
+ "symbols": ["BAR", "BAZ"]}""")
+
+ writer, encoder, datum_writer = write_datum(datum_to_write, writers_schema)
+ reader = cStringIO.StringIO(writer.getvalue())
+ decoder = io.BinaryDecoder(reader)
+ datum_reader = io.DatumReader(writers_schema, readers_schema)
+ self.assertRaises(io.SchemaResolutionException, datum_reader.read, decoder)
+
+ def test_default_value(self):
+ print_test_name('TEST DEFAULT VALUE')
+ writers_schema = LONG_RECORD_SCHEMA
+ datum_to_write = LONG_RECORD_DATUM
- def test_skip_int(self):
- print ''
- print 'TEST SKIP INT'
- print '============='
- print ''
correct = 0
- for value_to_skip, hex_encoding in BINARY_INT_ENCODINGS:
- VALUE_TO_READ = 6253
- print 'Value to Skip: %d' % value_to_skip
-
- # write some data in binary to string buffer
- writer = cStringIO.StringIO()
- encoder = io.BinaryEncoder(writer)
- datum_writer = io.DatumWriter(schema.parse('"int"'))
- datum_writer.write(value_to_skip, encoder)
- datum_writer.write(VALUE_TO_READ, encoder)
-
- # skip the value
- reader = cStringIO.StringIO(writer.getvalue())
- decoder = io.BinaryDecoder(reader)
- decoder.skip_int()
-
- # read data from string buffer
- datum_reader = io.DatumReader(schema.parse('"int"'))
- read_value = datum_reader.read(decoder)
-
- # check it
- print 'Read Value: %d' % read_value
- if read_value == VALUE_TO_READ: correct += 1
- print ''
- self.assertEquals(correct, len(BINARY_INT_ENCODINGS))
+ for field_type, default_json, default_datum in DEFAULT_VALUE_EXAMPLES:
+ readers_schema = schema.parse("""\
+ {"type": "record", "name": "Test",
+ "fields": [{"name": "H", "type": %s, "default": %s}]}
+ """ % (field_type, default_json))
+ datum_to_read = {'H': default_datum}
+
+ writer, encoder, datum_writer = write_datum(datum_to_write,
writers_schema)
+ datum_read = read_datum(writer, writers_schema, readers_schema)
+ print 'Datum Read: %s' % datum_read
+ if datum_to_read == datum_read: correct += 1
+ self.assertEquals(correct, len(DEFAULT_VALUE_EXAMPLES))
+
+ def test_no_default_value(self):
+ print_test_name('TEST NO DEFAULT VALUE')
+ writers_schema = LONG_RECORD_SCHEMA
+ datum_to_write = LONG_RECORD_DATUM
+
+ readers_schema = schema.parse("""\
+ {"type": "record", "name": "Test",
+ "fields": [{"name": "H", "type": "int"}]}""")
+
+ writer, encoder, datum_writer = write_datum(datum_to_write, writers_schema)
+ reader = cStringIO.StringIO(writer.getvalue())
+ decoder = io.BinaryDecoder(reader)
+ datum_reader = io.DatumReader(writers_schema, readers_schema)
+ self.assertRaises(io.SchemaResolutionException, datum_reader.read, decoder)
+
+ def test_projection(self):
+ print_test_name('TEST PROJECTION')
+ writers_schema = LONG_RECORD_SCHEMA
+ datum_to_write = LONG_RECORD_DATUM
+
+ readers_schema = schema.parse("""\
+ {"type": "record", "name": "Test",
+ "fields": [{"name": "E", "type": "int"},
+ {"name": "F", "type": "int"}]}""")
+ datum_to_read = {'E': 5, 'F': 6}
+
+ writer, encoder, datum_writer = write_datum(datum_to_write, writers_schema)
+ datum_read = read_datum(writer, writers_schema, readers_schema)
+ print 'Datum Read: %s' % datum_read
+ self.assertEquals(datum_to_read, datum_read)
+
+ def test_field_order(self):
+ print_test_name('TEST FIELD ORDER')
+ writers_schema = LONG_RECORD_SCHEMA
+ datum_to_write = LONG_RECORD_DATUM
+
+ readers_schema = schema.parse("""\
+ {"type": "record", "name": "Test",
+ "fields": [{"name": "F", "type": "int"},
+ {"name": "E", "type": "int"}]}""")
+ datum_to_read = {'E': 5, 'F': 6}
+
+ writer, encoder, datum_writer = write_datum(datum_to_write, writers_schema)
+ datum_read = read_datum(writer, writers_schema, readers_schema)
+ print 'Datum Read: %s' % datum_read
+ self.assertEquals(datum_to_read, datum_read)
if __name__ == '__main__':
unittest.main()