Author: philz
Date: Wed Feb 3 06:58:46 2010
New Revision: 905913
URL: http://svn.apache.org/viewvc?rev=905913&view=rev
Log:
AVRO-386. Python implementation of compression (philz)
Added:
hadoop/avro/trunk/lang/py/src/avro/tool.py
Modified:
hadoop/avro/trunk/CHANGES.txt
hadoop/avro/trunk/lang/py/src/avro/datafile.py
hadoop/avro/trunk/lang/py/test/test_datafile.py
Modified: hadoop/avro/trunk/CHANGES.txt
URL:
http://svn.apache.org/viewvc/hadoop/avro/trunk/CHANGES.txt?rev=905913&r1=905912&r2=905913&view=diff
==============================================================================
--- hadoop/avro/trunk/CHANGES.txt (original)
+++ hadoop/avro/trunk/CHANGES.txt Wed Feb 3 06:58:46 2010
@@ -279,6 +279,8 @@
AVRO-388. Using ResolvingDecoder in GenericDatumReader (thiru)
+ AVRO-386. Python implementaiton of compression (philz)
+
OPTIMIZATIONS
AVRO-172. More efficient schema processing (massie)
Modified: hadoop/avro/trunk/lang/py/src/avro/datafile.py
URL:
http://svn.apache.org/viewvc/hadoop/avro/trunk/lang/py/src/avro/datafile.py?rev=905913&r1=905912&r2=905913&view=diff
==============================================================================
--- hadoop/avro/trunk/lang/py/src/avro/datafile.py (original)
+++ hadoop/avro/trunk/lang/py/src/avro/datafile.py Wed Feb 3 06:58:46 2010
@@ -16,6 +16,7 @@
"""
Read/Write Avro File Object Containers.
"""
+import zlib
import uuid
import cStringIO
from avro import schema
@@ -37,9 +38,12 @@
{"name": "meta", "type": {"type": "map", "values": "bytes"}},
{"name": "sync", "type": {"type": "fixed", "name": "sync", "size": %d}}]}
""" % (MAGIC_SIZE, SYNC_SIZE))
-VALID_CODECS = ['null']
+VALID_CODECS = ['null', 'deflate']
VALID_ENCODINGS = ['binary'] # not used yet
+CODEC_KEY = "avro.codec"
+SCHEMA_KEY = "avro.schema"
+
#
# Exceptions
#
@@ -61,9 +65,11 @@
return uuid.uuid4().bytes
# TODO(hammer): make 'encoder' a metadata property
- def __init__(self, writer, datum_writer, writers_schema=None):
+ def __init__(self, writer, datum_writer, writers_schema=None, codec='null'):
"""
If the schema is not present, presume we're appending.
+
+ @param writer: File-like object to write into.
"""
self._writer = writer
self._encoder = io.BinaryEncoder(writer)
@@ -74,9 +80,11 @@
self._meta = {}
if writers_schema is not None:
+ if codec not in VALID_CODECS:
+ raise DataFileException("Unknown codec: %r" % codec)
self._sync_marker = DataFileWriter.generate_sync_marker()
- self.set_meta('codec', 'null')
- self.set_meta('schema', str(writers_schema))
+ self.set_meta('avro.codec', codec)
+ self.set_meta('avro.schema', str(writers_schema))
self.datum_writer.writers_schema = writers_schema
self._write_header()
else:
@@ -86,11 +94,11 @@
# TODO(hammer): collect arbitrary metadata
# collect metadata
self._sync_marker = dfr.sync_marker
- self.set_meta('codec', dfr.get_meta('codec'))
+ self.set_meta('avro.codec', dfr.get_meta('avro.codec'))
# get schema used to write existing file
- schema_from_file = dfr.get_meta('schema')
- self.set_meta('schema', schema_from_file)
+ schema_from_file = dfr.get_meta('avro.schema')
+ self.set_meta('avro.schema', schema_from_file)
self.datum_writer.writers_schema = schema.parse(schema_from_file)
# seek to the end of the file and prepare for writing
@@ -123,19 +131,28 @@
self.datum_writer.write_data(META_SCHEMA, header, self.encoder)
# TODO(hammer): make a schema for blocks and use datum_writer
- # TODO(hammer): use codec when writing the block contents
def _write_block(self):
if self.block_count > 0:
# write number of items in block
self.encoder.write_long(self.block_count)
# write block contents
- if self.get_meta('codec') == 'null':
- self.writer.write(self.buffer_writer.getvalue())
+ uncompressed_data = self.buffer_writer.getvalue()
+ if self.get_meta(CODEC_KEY) == 'null':
+ compressed_data = uncompressed_data
+ elif self.get_meta(CODEC_KEY) == 'deflate':
+ # The first two characters and last character are zlib
+ # wrappers around deflate data.
+ compressed_data = zlib.compress(uncompressed_data)[2:-1]
else:
- fail_msg = '"%s" codec is not supported.' % self.get_meta('codec')
+ fail_msg = '"%s" codec is not supported.' % self.get_meta(CODEC_KEY)
raise DataFileException(fail_msg)
+ # Write length of block
+ self.encoder.write_long(len(compressed_data))
+ # Write block
+ self.writer.write(compressed_data)
+
# write sync marker
self.writer.write(self.sync_marker)
@@ -177,30 +194,34 @@
# TODO(hammer): allow user to specify the encoder
def __init__(self, reader, datum_reader):
self._reader = reader
- self._decoder = io.BinaryDecoder(reader)
+ self._raw_decoder = io.BinaryDecoder(reader)
+ self._datum_decoder = None # Maybe reset at every block.
self._datum_reader = datum_reader
# read the header: magic, meta, sync
self._read_header()
# ensure codec is valid
- codec_from_file = self.get_meta('codec')
- if codec_from_file is not None and codec_from_file not in VALID_CODECS:
- raise DataFileException('Unknown codec: %s.' % codec_from_file)
+ self.codec = self.get_meta('avro.codec')
+ if self.codec is None:
+ self.codec = "null"
+ if self.codec not in VALID_CODECS:
+ raise DataFileException('Unknown codec: %s.' % self.codec)
# get file length
self._file_length = self.determine_file_length()
# get ready to read
self._block_count = 0
- self.datum_reader.writers_schema = schema.parse(self.get_meta('schema'))
+ self.datum_reader.writers_schema = schema.parse(self.get_meta(SCHEMA_KEY))
def __iter__(self):
return self
# read-only properties
reader = property(lambda self: self._reader)
- decoder = property(lambda self: self._decoder)
+ raw_decoder = property(lambda self: self._raw_decoder)
+ datum_decoder = property(lambda self: self._datum_decoder)
datum_reader = property(lambda self: self._datum_reader)
sync_marker = property(lambda self: self._sync_marker)
meta = property(lambda self: self._meta)
@@ -235,7 +256,8 @@
self.reader.seek(0, 0)
# read header into a dict
- header = self.datum_reader.read_data(META_SCHEMA, META_SCHEMA,
self.decoder)
+ header = self.datum_reader.read_data(
+ META_SCHEMA, META_SCHEMA, self.raw_decoder)
# check magic number
if header.get('magic') != MAGIC:
@@ -250,7 +272,19 @@
self._sync_marker = header['sync']
def _read_block_header(self):
- self.block_count = self.decoder.read_long()
+ self.block_count = self.raw_decoder.read_long()
+ if self.codec == "null":
+ # Skip a long; we don't need to use the length.
+ self.raw_decoder.skip_long()
+ self._datum_decoder = self._raw_decoder
+ else:
+ # Compressed data is stored as (length, data), which
+ # corresponds to have bytes is stored.
+ data = self.raw_decoder.read_bytes()
+ # -15 is the log of the window size; negative indicates
+ # "raw" (no zlib headers) decompression. See zlib.h.
+ uncompressed = zlib.decompress(data, -15)
+ self._datum_decoder = io.BinaryDecoder(cStringIO.StringIO(uncompressed))
def _skip_sync(self):
"""
@@ -277,7 +311,7 @@
else:
self._read_block_header()
- datum = self.datum_reader.read(self.decoder)
+ datum = self.datum_reader.read(self.datum_decoder)
self.block_count -= 1
return datum
Added: hadoop/avro/trunk/lang/py/src/avro/tool.py
URL:
http://svn.apache.org/viewvc/hadoop/avro/trunk/lang/py/src/avro/tool.py?rev=905913&view=auto
==============================================================================
--- hadoop/avro/trunk/lang/py/src/avro/tool.py (added)
+++ hadoop/avro/trunk/lang/py/src/avro/tool.py Wed Feb 3 06:58:46 2010
@@ -0,0 +1,45 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+"""
+Command-line tool for manipulating Avro data files.
+
+NOTE: The API for the command-line tool is experimental.
+"""
+
+import sys
+from avro import datafile, io
+
+def main(args=sys.argv):
+ if len(args) == 1:
+ print "Usage: %s (dump)" % args[0]
+ return 1
+
+ if args[1] == "dump":
+ if len(args) != 3:
+ print "Usage: %s dump input_file" % args[0]
+ return 1
+ for d in datafile.DataFileReader(file_or_stdin(args[2]), io.DatumReader()):
+ print repr(d)
+ return 0
+
+def file_or_stdin(f):
+ if f == "-":
+ return sys.stdin
+ else:
+ return file(f)
+
+if __name__ == "__main__":
+ sys.exit(main(sys.argv))
Modified: hadoop/avro/trunk/lang/py/test/test_datafile.py
URL:
http://svn.apache.org/viewvc/hadoop/avro/trunk/lang/py/test/test_datafile.py?rev=905913&r1=905912&r2=905913&view=diff
==============================================================================
--- hadoop/avro/trunk/lang/py/test/test_datafile.py (original)
+++ hadoop/avro/trunk/lang/py/test/test_datafile.py Wed Feb 3 06:58:46 2010
@@ -51,6 +51,7 @@
)
FILENAME = 'test_datafile.out'
+CODECS_TO_VALIDATE = ('null', 'deflate')
# TODO(hammer): clean up written files with ant, not os.remove
class TestDataFile(unittest.TestCase):
@@ -61,38 +62,40 @@
print ''
correct = 0
for i, (example_schema, datum) in enumerate(SCHEMAS_TO_VALIDATE):
- print ''
- print 'SCHEMA NUMBER %d' % (i + 1)
- print '================'
- print ''
- print 'Schema: %s' % example_schema
- print 'Datum: %s' % datum
-
- # write data in binary to file 10 times
- writer = open(FILENAME, 'wb')
- datum_writer = io.DatumWriter()
- schema_object = schema.parse(example_schema)
- dfw = datafile.DataFileWriter(writer, datum_writer, schema_object)
- for i in range(10):
- dfw.append(datum)
- dfw.close()
+ for codec in CODECS_TO_VALIDATE:
+ print ''
+ print 'SCHEMA NUMBER %d' % (i + 1)
+ print '================'
+ print ''
+ print 'Schema: %s' % example_schema
+ print 'Datum: %s' % datum
+ print 'Codec: %s' % codec
+
+ # write data in binary to file 10 times
+ writer = open(FILENAME, 'wb')
+ datum_writer = io.DatumWriter()
+ schema_object = schema.parse(example_schema)
+ dfw = datafile.DataFileWriter(writer, datum_writer, schema_object,
codec=codec)
+ for i in range(10):
+ dfw.append(datum)
+ dfw.close()
- # read data in binary from file
- reader = open(FILENAME, 'rb')
- datum_reader = io.DatumReader()
- dfr = datafile.DataFileReader(reader, datum_reader)
- round_trip_data = []
- for datum in dfr:
- round_trip_data.append(datum)
-
- print 'Round Trip Data: %s' % round_trip_data
- print 'Round Trip Data Length: %d' % len(round_trip_data)
- is_correct = [datum] * 10 == round_trip_data
- if is_correct: correct += 1
- print 'Correct Round Trip: %s' % is_correct
- print ''
+ # read data in binary from file
+ reader = open(FILENAME, 'rb')
+ datum_reader = io.DatumReader()
+ dfr = datafile.DataFileReader(reader, datum_reader)
+ round_trip_data = []
+ for datum in dfr:
+ round_trip_data.append(datum)
+
+ print 'Round Trip Data: %s' % round_trip_data
+ print 'Round Trip Data Length: %d' % len(round_trip_data)
+ is_correct = [datum] * 10 == round_trip_data
+ if is_correct: correct += 1
+ print 'Correct Round Trip: %s' % is_correct
+ print ''
os.remove(FILENAME)
- self.assertEquals(correct, len(SCHEMAS_TO_VALIDATE))
+ self.assertEquals(correct,
len(CODECS_TO_VALIDATE)*len(SCHEMAS_TO_VALIDATE))
def test_append(self):
print ''
@@ -101,44 +104,46 @@
print ''
correct = 0
for i, (example_schema, datum) in enumerate(SCHEMAS_TO_VALIDATE):
- print ''
- print 'SCHEMA NUMBER %d' % (i + 1)
- print '================'
- print ''
- print 'Schema: %s' % example_schema
- print 'Datum: %s' % datum
-
- # write data in binary to file once
- writer = open(FILENAME, 'wb')
- datum_writer = io.DatumWriter()
- schema_object = schema.parse(example_schema)
- dfw = datafile.DataFileWriter(writer, datum_writer, schema_object)
- dfw.append(datum)
- dfw.close()
-
- # open file, write, and close nine times
- for i in range(9):
- writer = open(FILENAME, 'ab+')
- dfw = datafile.DataFileWriter(writer, io.DatumWriter())
+ for codec in CODECS_TO_VALIDATE:
+ print ''
+ print 'SCHEMA NUMBER %d' % (i + 1)
+ print '================'
+ print ''
+ print 'Schema: %s' % example_schema
+ print 'Datum: %s' % datum
+ print 'Codec: %s' % codec
+
+ # write data in binary to file once
+ writer = open(FILENAME, 'wb')
+ datum_writer = io.DatumWriter()
+ schema_object = schema.parse(example_schema)
+ dfw = datafile.DataFileWriter(writer, datum_writer, schema_object,
codec=codec)
dfw.append(datum)
dfw.close()
- # read data in binary from file
- reader = open(FILENAME, 'rb')
- datum_reader = io.DatumReader()
- dfr = datafile.DataFileReader(reader, datum_reader)
- appended_data = []
- for datum in dfr:
- appended_data.append(datum)
-
- print 'Appended Data: %s' % appended_data
- print 'Appended Data Length: %d' % len(appended_data)
- is_correct = [datum] * 10 == appended_data
- if is_correct: correct += 1
- print 'Correct Appended: %s' % is_correct
- print ''
+ # open file, write, and close nine times
+ for i in range(9):
+ writer = open(FILENAME, 'ab+')
+ dfw = datafile.DataFileWriter(writer, io.DatumWriter())
+ dfw.append(datum)
+ dfw.close()
+
+ # read data in binary from file
+ reader = open(FILENAME, 'rb')
+ datum_reader = io.DatumReader()
+ dfr = datafile.DataFileReader(reader, datum_reader)
+ appended_data = []
+ for datum in dfr:
+ appended_data.append(datum)
+
+ print 'Appended Data: %s' % appended_data
+ print 'Appended Data Length: %d' % len(appended_data)
+ is_correct = [datum] * 10 == appended_data
+ if is_correct: correct += 1
+ print 'Correct Appended: %s' % is_correct
+ print ''
os.remove(FILENAME)
- self.assertEquals(correct, len(SCHEMAS_TO_VALIDATE))
+ self.assertEquals(correct,
len(CODECS_TO_VALIDATE)*len(SCHEMAS_TO_VALIDATE))
if __name__ == '__main__':
unittest.main()