Author: dcreager
Date: Mon Sep 26 13:12:46 2011
New Revision: 1175854
URL: http://svn.apache.org/viewvc?rev=1175854&view=rev
Log:
AVRO-893. C: Avro data file functions using value API
The functions for reading and writing the contents of an Avro data file
have now been ported to the new value API. The structure of the new
functions is the same as before; we just read from or write into an
avro_value_t instead of an avro_datum_t.
Modified:
avro/trunk/CHANGES.txt
avro/trunk/lang/c/src/avro/io.h
avro/trunk/lang/c/src/avropipe.c
avro/trunk/lang/c/src/datafile.c
Modified: avro/trunk/CHANGES.txt
URL:
http://svn.apache.org/viewvc/avro/trunk/CHANGES.txt?rev=1175854&r1=1175853&r2=1175854&view=diff
==============================================================================
--- avro/trunk/CHANGES.txt (original)
+++ avro/trunk/CHANGES.txt Mon Sep 26 13:12:46 2011
@@ -22,6 +22,9 @@ Avro 1.6.0 (unreleased)
AVRO-863. C: Schema resolution using new value interface. (dcreager)
+ AVRO-893. C: Avro data file functions using new value interface.
+ (dcreager)
+
OPTIMIZATIONS
AVRO-853: Java: Cache Schema hash codes. (cutting)
Modified: avro/trunk/lang/c/src/avro/io.h
URL:
http://svn.apache.org/viewvc/avro/trunk/lang/c/src/avro/io.h?rev=1175854&r1=1175853&r2=1175854&view=diff
==============================================================================
--- avro/trunk/lang/c/src/avro/io.h (original)
+++ avro/trunk/lang/c/src/avro/io.h Mon Sep 26 13:12:46 2011
@@ -92,12 +92,21 @@ int avro_file_writer_create(const char *
int avro_file_writer_open(const char *path, avro_file_writer_t * writer);
int avro_file_reader(const char *path, avro_file_reader_t * reader);
+avro_schema_t
+avro_file_reader_get_writer_schema(avro_file_reader_t reader);
+
int avro_file_writer_sync(avro_file_writer_t writer);
int avro_file_writer_flush(avro_file_writer_t writer);
int avro_file_writer_close(avro_file_writer_t writer);
int avro_file_reader_close(avro_file_reader_t reader);
+int
+avro_file_reader_read_value(avro_file_reader_t reader, avro_value_t *dest);
+
+int
+avro_file_writer_append_value(avro_file_writer_t writer, avro_value_t *src);
+
/*
* Legacy avro_datum_t API
*/
Modified: avro/trunk/lang/c/src/avropipe.c
URL:
http://svn.apache.org/viewvc/avro/trunk/lang/c/src/avropipe.c?rev=1175854&r1=1175853&r2=1175854&view=diff
==============================================================================
--- avro/trunk/lang/c/src/avropipe.c (original)
+++ avro/trunk/lang/c/src/avropipe.c Mon Sep 26 13:12:46 2011
@@ -347,17 +347,21 @@ process_file(const char *filename)
avro_raw_string_t prefix;
avro_raw_string_init(&prefix);
- avro_datum_t datum;
+ avro_schema_t wschema = avro_file_reader_get_writer_schema(reader);
+ avro_value_iface_t *iface = avro_generic_class_from_schema(wschema);
+ avro_value_t value;
+ avro_generic_value_new(iface, &value);
+
size_t record_number = 0;
- for (; avro_file_reader_read(reader, NULL, &datum) == 0;
record_number++) {
- avro_value_t value;
- avro_datum_as_value(&value, datum);
+ for (; avro_file_reader_read_value(reader, &value) == 0;
record_number++) {
create_array_prefix(&prefix, "", record_number);
process_value(avro_raw_string_get(&prefix), &value);
}
avro_raw_string_done(&prefix);
+ avro_value_decref(&value);
+ avro_value_iface_decref(iface);
avro_file_reader_close(reader);
}
Modified: avro/trunk/lang/c/src/datafile.c
URL:
http://svn.apache.org/viewvc/avro/trunk/lang/c/src/datafile.c?rev=1175854&r1=1175853&r2=1175854&view=diff
==============================================================================
--- avro/trunk/lang/c/src/datafile.c (original)
+++ avro/trunk/lang/c/src/datafile.c Mon Sep 26 13:12:46 2011
@@ -319,6 +319,13 @@ int avro_file_reader(const char *path, a
return rval;
}
+avro_schema_t
+avro_file_reader_get_writer_schema(avro_file_reader_t r)
+{
+ check_param(NULL, r, "reader");
+ return r->writers_schema;
+}
+
static int file_write_block(avro_file_writer_t w)
{
const avro_encoding_t *enc = &avro_binary_encoding;
@@ -368,6 +375,29 @@ int avro_file_writer_append(avro_file_wr
return 0;
}
+int
+avro_file_writer_append_value(avro_file_writer_t w, avro_value_t *value)
+{
+ int rval;
+ check_param(EINVAL, w, "writer");
+ check_param(EINVAL, value, "value");
+
+ rval = avro_value_write(w->datum_writer, value);
+ if (rval) {
+ check(rval, file_write_block(w));
+ rval = avro_value_write(w->datum_writer, value);
+ if (rval) {
+ avro_set_error("Value too large for file block size");
+ /* TODO: if the value encoder larger than our buffer,
+ just write a single large datum */
+ return rval;
+ }
+ }
+ w->block_count++;
+ w->block_size = avro_writer_tell(w->datum_writer);
+ return 0;
+}
+
int avro_file_writer_sync(avro_file_writer_t w)
{
return file_write_block(w);
@@ -418,6 +448,31 @@ int avro_file_reader_read(avro_file_read
return 0;
}
+int
+avro_file_reader_read_value(avro_file_reader_t r, avro_value_t *value)
+{
+ int rval;
+ char sync[16];
+
+ check_param(EINVAL, r, "reader");
+ check_param(EINVAL, value, "value");
+
+ check(rval, avro_value_read(r->reader, value));
+ r->blocks_read++;
+
+ if (r->blocks_read == r->blocks_total) {
+ check(rval, avro_read(r->reader, sync, sizeof(sync)));
+ if (memcmp(r->sync, sync, sizeof(r->sync)) != 0) {
+ /* wrong sync bytes */
+ avro_set_error("Incorrect sync bytes");
+ return EILSEQ;
+ }
+ /* For now, ignore errors (e.g. EOF) */
+ file_read_block_count(r);
+ }
+ return 0;
+}
+
int avro_file_reader_close(avro_file_reader_t reader)
{
avro_schema_decref(reader->writers_schema);