Author: dcreager
Date: Thu May 12 14:57:27 2011
New Revision: 1102332
URL: http://svn.apache.org/viewvc?rev=1102332&view=rev
Log:
AVRO-818. C: Fix data file corruption bug
This patch fixes the C library's data file writer to produce correct
files when there are enough records to produce multiple blocks in the
file. The logic in datafile.c is that we have a fixed-buffer in-memory
avro_writer_t instance. When you append records to the data file, they
go into this memory buffer. If we get an error serializing into the
memory buffer, it's presumably because we've filled it, so we write out
the memory buffer's contents as a new block in the file, clear the
buffer, and try again.
The problem is that the failed serialization into the memory buffer
isn't atomic; some of the serialization will have made it into the
buffer before we discover that there's not enough room. Before, this
incomplete record would then make it into the file. To fix this, we
keep track of the size of the in-memory buffer after the most recent
successfully serialized record, and use this as the block size when we
write a block to disk. This ensures that even if there's any incomplete
records at the end of the memory buffer, we don't include them in the
block.
Modified:
avro/trunk/CHANGES.txt
avro/trunk/lang/c/examples/CMakeLists.txt
avro/trunk/lang/c/examples/quickstop.c
avro/trunk/lang/c/src/datafile.c
Modified: avro/trunk/CHANGES.txt
URL:
http://svn.apache.org/viewvc/avro/trunk/CHANGES.txt?rev=1102332&r1=1102331&r2=1102332&view=diff
==============================================================================
--- avro/trunk/CHANGES.txt (original)
+++ avro/trunk/CHANGES.txt Thu May 12 14:57:27 2011
@@ -14,6 +14,8 @@ Avro 1.6.0 (unreleased)
Implemented writeBuffers for the NettyTransceiver to allow it to
process one-way messages.
+ AVRO-818. C: Fix data file corruption bug in C library (dcreager)
+
Avro 1.5.1 (unreleased)
NEW FEATURES
Modified: avro/trunk/lang/c/examples/CMakeLists.txt
URL:
http://svn.apache.org/viewvc/avro/trunk/lang/c/examples/CMakeLists.txt?rev=1102332&r1=1102331&r2=1102332&view=diff
==============================================================================
--- avro/trunk/lang/c/examples/CMakeLists.txt (original)
+++ avro/trunk/lang/c/examples/CMakeLists.txt Thu May 12 14:57:27 2011
@@ -20,3 +20,6 @@
add_executable(quickstop quickstop.c)
target_link_libraries(quickstop avro-static)
+add_test(quickstop
+ ${CMAKE_COMMAND} -E chdir ${AvroC_SOURCE_DIR}/examples
+ ${CMAKE_CURRENT_BINARY_DIR}/quickstop)
Modified: avro/trunk/lang/c/examples/quickstop.c
URL:
http://svn.apache.org/viewvc/avro/trunk/lang/c/examples/quickstop.c?rev=1102332&r1=1102331&r2=1102332&view=diff
==============================================================================
--- avro/trunk/lang/c/examples/quickstop.c (original)
+++ avro/trunk/lang/c/examples/quickstop.c Thu May 12 14:57:27 2011
@@ -82,7 +82,7 @@ add_person(avro_file_writer_t db, const
avro_datum_decref(phone_datum);
avro_datum_decref(person);
- fprintf(stdout, "Successfully added %s, %s id=%"PRId64"\n", last,
first, id);
+ //fprintf(stdout, "Successfully added %s, %s id=%"PRId64"\n", last,
first, id);
}
int print_person(avro_file_reader_t db, avro_schema_t reader_schema)
@@ -134,6 +134,7 @@ int main(void)
avro_schema_t projection_schema, first_name_schema, phone_schema;
int64_t i;
const char *dbname = "quickstop.db";
+ char number[15] = {0};
/* Initialize the schema structure from JSON */
init_schema();
@@ -146,13 +147,18 @@ int main(void)
fprintf(stderr, "There was an error creating %s\n", dbname);
exit(EXIT_FAILURE);
}
- /* Add people to the database */
- add_person(db, "Dante", "Hicks", "(555) 123-4567", 32);
- add_person(db, "Randal", "Graves", "(555) 123-5678", 30);
- add_person(db, "Veronica", "Loughran", "(555) 123-0987", 28);
- add_person(db, "Caitlin", "Bree", "(555) 123-2323", 27);
- add_person(db, "Bob", "Silent", "(555) 123-6422", 29);
- add_person(db, "Jay", "???", "(555) 123-9182", 26);
+
+ /* Add lots of people to the database */
+ for (i = 0; i < 1000; i++)
+ {
+ sprintf(number, "(%d)", (int)i);
+ add_person(db, "Dante", "Hicks", number, 32);
+ add_person(db, "Randal", "Graves", "(555) 123-5678", 30);
+ add_person(db, "Veronica", "Loughran", "(555) 123-0987", 28);
+ add_person(db, "Caitlin", "Bree", "(555) 123-2323", 27);
+ add_person(db, "Bob", "Silent", "(555) 123-6422", 29);
+ add_person(db, "Jay", "???", number, 26);
+ }
avro_file_writer_close(db);
fprintf(stdout, "\nNow let's read all the records back out\n");
Modified: avro/trunk/lang/c/src/datafile.c
URL:
http://svn.apache.org/viewvc/avro/trunk/lang/c/src/datafile.c?rev=1102332&r1=1102331&r2=1102332&view=diff
==============================================================================
--- avro/trunk/lang/c/src/datafile.c (original)
+++ avro/trunk/lang/c/src/datafile.c Thu May 12 14:57:27 2011
@@ -40,6 +40,7 @@ struct avro_file_writer_t_ {
avro_writer_t writer;
char sync[16];
int block_count;
+ size_t block_size;
avro_writer_t datum_writer;
char datum_buffer[16 * 1024];
};
@@ -315,15 +316,14 @@ static int file_write_block(avro_file_wr
int rval;
if (w->block_count) {
- int64_t blocklen = avro_writer_tell(w->datum_writer);
/* Write the block count */
check_prefix(rval, enc->write_long(w->writer, w->block_count),
"Cannot write file block count: ");
/* Write the block length */
- check_prefix(rval, enc->write_long(w->writer, blocklen),
+ check_prefix(rval, enc->write_long(w->writer, w->block_size),
"Cannot write file block size: ");
/* Write the block */
- check_prefix(rval, avro_write(w->writer, w->datum_buffer,
blocklen),
+ check_prefix(rval, avro_write(w->writer, w->datum_buffer,
w->block_size),
"Cannot write file block: ");
/* Write the sync marker */
check_prefix(rval, write_sync(w),
@@ -331,6 +331,7 @@ static int file_write_block(avro_file_wr
/* Reset the datum writer */
avro_writer_reset(w->datum_writer);
w->block_count = 0;
+ w->block_size = 0;
}
return 0;
}
@@ -354,6 +355,7 @@ int avro_file_writer_append(avro_file_wr
}
}
w->block_count++;
+ w->block_size = avro_writer_tell(w->datum_writer);
return 0;
}