Author: massie
Date: Thu Jan 28 02:06:31 2010
New Revision: 903941

URL: http://svn.apache.org/viewvc?rev=903941&view=rev
Log:
AVRO-384. Add schema projection to the C implementation

Added:
    hadoop/avro/trunk/lang/c/src/datum_skip.c
Modified:
    hadoop/avro/trunk/CHANGES.txt
    hadoop/avro/trunk/lang/c/docs/index.txt
    hadoop/avro/trunk/lang/c/examples/quickstop.c
    hadoop/avro/trunk/lang/c/src/Makefile.am
    hadoop/avro/trunk/lang/c/src/avro.h
    hadoop/avro/trunk/lang/c/src/datum_read.c
    hadoop/avro/trunk/lang/c/src/io.c
    hadoop/avro/trunk/lang/c/tests/Makefile.am
    hadoop/avro/trunk/lang/c/tests/test_valgrind
    hadoop/avro/trunk/lang/c/version.sh

Modified: hadoop/avro/trunk/CHANGES.txt
URL: 
http://svn.apache.org/viewvc/hadoop/avro/trunk/CHANGES.txt?rev=903941&r1=903940&r2=903941&view=diff
==============================================================================
--- hadoop/avro/trunk/CHANGES.txt (original)
+++ hadoop/avro/trunk/CHANGES.txt Thu Jan 28 02:06:31 2010
@@ -265,6 +265,8 @@
     AVRO-381. Update documentation to talk about reference counting and 
               memory management (massie)
 
+    AVRO-384. Add schema projection to the C implementation (massie)
+
   OPTIMIZATIONS
 
     AVRO-172. More efficient schema processing (massie)

Modified: hadoop/avro/trunk/lang/c/docs/index.txt
URL: 
http://svn.apache.org/viewvc/hadoop/avro/trunk/lang/c/docs/index.txt?rev=903941&r1=903940&r2=903941&view=diff
==============================================================================
--- hadoop/avro/trunk/lang/c/docs/index.txt (original)
+++ hadoop/avro/trunk/lang/c/docs/index.txt Thu Jan 28 02:06:31 2010
@@ -154,16 +154,25 @@
 | 32 34 .. .. .. .. .. .. | .. .. .. .. .. .. .. .. |  24..............
 
 Now let's read all the records back out
-1 |           Dante |           Hicks |            (555) 123-4567 |   32
-2 |          Randal |          Graves |            (555) 123-5678 |   30
-3 |        Veronica |        Loughran |            (555) 123-0987 |   28
-4 |         Caitlin |            Bree |            (555) 123-2323 |   27
-5 |             Bob |          Silent |            (555) 123-6422 |   29
-6 |             Jay |             ??? |            (555) 123-9182 |   26
+1 |           Dante |           Hicks |  (555) 123-4567 | 32
+2 |          Randal |          Graves |  (555) 123-5678 | 30
+3 |        Veronica |        Loughran |  (555) 123-0987 | 28
+4 |         Caitlin |            Bree |  (555) 123-2323 | 27
+5 |             Bob |          Silent |  (555) 123-6422 | 29
+6 |             Jay |             ??? |  (555) 123-9182 | 26
+
+
+Use projection to print only the First name and phone numbers
+          Dante |  (555) 123-4567 | 
+         Randal |  (555) 123-5678 | 
+       Veronica |  (555) 123-0987 | 
+        Caitlin |  (555) 123-2323 | 
+            Bob |  (555) 123-6422 | 
+            Jay |  (555) 123-9182 | 
 ----
 
-The *Quick Stop* store owner was so pleased, he asked you to create a 
-movie database for his *RST Video* store.  
+The *Quick Stop* owner was so pleased, he asked you to create a 
+movie database for his *RST Video* store.
 
 == Reference files
 

Modified: hadoop/avro/trunk/lang/c/examples/quickstop.c
URL: 
http://svn.apache.org/viewvc/hadoop/avro/trunk/lang/c/examples/quickstop.c?rev=903941&r1=903940&r2=903941&view=diff
==============================================================================
--- hadoop/avro/trunk/lang/c/examples/quickstop.c (original)
+++ hadoop/avro/trunk/lang/c/examples/quickstop.c Thu Jan 28 02:06:31 2010
@@ -20,8 +20,6 @@
 
 char buf[4096];
 avro_schema_t person_schema;
-avro_reader_t reader;
-avro_writer_t writer;
 int64_t id = 0;
 
 /* A simple schema for our tutorial */
@@ -48,7 +46,8 @@
 
 /* Create a datum to match the person schema and save it */
 void
-add_person(const char *first, const char *last, const char *phone, int32_t age)
+add_person(avro_writer_t writer, const char *first, const char *last,
+          const char *phone, int32_t age)
 {
        avro_datum_t person = avro_record("Person");
 
@@ -84,12 +83,12 @@
        fprintf(stdout, "Successfully added %s, %s id=%ld\n", last, first, id);
 }
 
-int print_person(void)
+int print_person(avro_reader_t reader, avro_schema_t reader_schema)
 {
        int rval;
        avro_datum_t person;
 
-       rval = avro_read_data(reader, person_schema, NULL, &person);
+       rval = avro_read_data(reader, person_schema, reader_schema, &person);
        if (rval == 0) {
                int64_t i64;
                int32_t i32;
@@ -97,22 +96,27 @@
                avro_datum_t id_datum, first_datum, last_datum, phone_datum,
                    age_datum;
 
-               avro_record_get(person, "ID", &id_datum);
-               avro_record_get(person, "First", &first_datum);
-               avro_record_get(person, "Last", &last_datum);
-               avro_record_get(person, "Phone", &phone_datum);
-               avro_record_get(person, "Age", &age_datum);
-
-               avro_int64_get(id_datum, &i64);
-               fprintf(stdout, "%ld | ", i64);
-               avro_string_get(first_datum, &p);
-               fprintf(stdout, "%15s | ", p);
-               avro_string_get(last_datum, &p);
-               fprintf(stdout, "%15s | ", p);
-               avro_string_get(phone_datum, &p);
-               fprintf(stdout, "%25s | ", p);
-               avro_int32_get(age_datum, &i32);
-               fprintf(stdout, "  %2d\n", i32);
+               if (avro_record_get(person, "ID", &id_datum) == 0) {
+                       avro_int64_get(id_datum, &i64);
+                       fprintf(stdout, "%ld | ", i64);
+               }
+               if (avro_record_get(person, "First", &first_datum) == 0) {
+                       avro_string_get(first_datum, &p);
+                       fprintf(stdout, "%15s | ", p);
+               }
+               if (avro_record_get(person, "Last", &last_datum) == 0) {
+                       avro_string_get(last_datum, &p);
+                       fprintf(stdout, "%15s | ", p);
+               }
+               if (avro_record_get(person, "Phone", &phone_datum) == 0) {
+                       avro_string_get(phone_datum, &p);
+                       fprintf(stdout, "%15s | ", p);
+               }
+               if (avro_record_get(person, "Age", &age_datum) == 0) {
+                       avro_int32_get(age_datum, &i32);
+                       fprintf(stdout, "%ld", i32);
+               }
+               fprintf(stdout, "\n");
 
                /* We no longer need this memory */
                avro_datum_decref(person);
@@ -122,22 +126,23 @@
 
 int main(void)
 {
+       avro_reader_t reader;
+       avro_writer_t writer;
+       avro_schema_t projection_schema, first_name_schema, phone_schema;
        int64_t i;
 
-       /* Create readers and writers backed by memory */
-       writer = avro_writer_memory(buf, sizeof(buf));
-       reader = avro_reader_memory(buf, sizeof(buf));
-
        /* Initialize the schema structure from JSON */
        init_schema();
 
        /* Add people to the database */
-       add_person("Dante", "Hicks", "(555) 123-4567", 32);
-       add_person("Randal", "Graves", "(555) 123-5678", 30);
-       add_person("Veronica", "Loughran", "(555) 123-0987", 28);
-       add_person("Caitlin", "Bree", "(555) 123-2323", 27);
-       add_person("Bob", "Silent", "(555) 123-6422", 29);
-       add_person("Jay", "???", "(555) 123-9182", 26);
+       writer = avro_writer_memory(buf, sizeof(buf));
+       add_person(writer, "Dante", "Hicks", "(555) 123-4567", 32);
+       add_person(writer, "Randal", "Graves", "(555) 123-5678", 30);
+       add_person(writer, "Veronica", "Loughran", "(555) 123-0987", 28);
+       add_person(writer, "Caitlin", "Bree", "(555) 123-2323", 27);
+       add_person(writer, "Bob", "Silent", "(555) 123-6422", 29);
+       add_person(writer, "Jay", "???", "(555) 123-9182", 26);
+       avro_writer_free(writer);
 
        fprintf(stdout,
                "\nAvro is compact. Here is the data for all %ld people.\n",
@@ -147,18 +152,43 @@
        fprintf(stdout, "\nNow let's read all the records back out\n");
 
        /* Read all the records and print them */
+       reader = avro_reader_memory(buf, sizeof(buf));
        for (i = 0; i < id; i++) {
-               if (print_person()) {
+               if (print_person(reader, NULL)) {
                        fprintf(stderr, "Error printing person\n");
                        exit(EXIT_FAILURE);
                }
        }
+       avro_reader_free(reader);
 
-       /* We don't need this schema anymore */
-       avro_schema_decref(person_schema);
+       /* You can also use projection, to only decode only the data you are
+          interested in.  This is particularly useful when you have 
+          huge data sets and you'll only interest in particular fields
+          e.g. your contacts First name and phone number */
+       projection_schema = avro_schema_record("Person");
+       first_name_schema = avro_schema_string();
+       phone_schema = avro_schema_string();
+       avro_schema_record_field_append(projection_schema, "First",
+                                       first_name_schema);
+       avro_schema_record_field_append(projection_schema, "Phone",
+                                       phone_schema);
 
-       /* We dont' need the reader/writer anymore */
+       /* Read only the record you're interested in */
+       fprintf(stdout,
+               "\n\nUse projection to print only the First name and phone 
numbers\n");
+       reader = avro_reader_memory(buf, sizeof(buf));
+       for (i = 0; i < id; i++) {
+               if (print_person(reader, projection_schema)) {
+                       fprintf(stderr, "Error printing person\n");
+                       exit(EXIT_FAILURE);
+               }
+       }
        avro_reader_free(reader);
-       avro_writer_free(writer);
+       avro_schema_decref(first_name_schema);
+       avro_schema_decref(phone_schema);
+       avro_schema_decref(projection_schema);
+
+       /* We don't need this schema anymore */
+       avro_schema_decref(person_schema);
        return 0;
 }

Modified: hadoop/avro/trunk/lang/c/src/Makefile.am
URL: 
http://svn.apache.org/viewvc/hadoop/avro/trunk/lang/c/src/Makefile.am?rev=903941&r1=903940&r2=903941&view=diff
==============================================================================
--- hadoop/avro/trunk/lang/c/src/Makefile.am (original)
+++ hadoop/avro/trunk/lang/c/src/Makefile.am Thu Jan 28 02:06:31 2010
@@ -7,7 +7,7 @@
 
 lib_LTLIBRARIES = libavro.la
 libavro_la_SOURCES = st.c st.h schema.c schema.h schema_printf.c 
schema_equal.c \
-datum.c datum_equal.c datum_validate.c datum_read.c datum_write.c datum.h \
+datum.c datum_equal.c datum_validate.c datum_read.c datum_skip.c datum_write.c 
datum.h \
 io.c dump.c dump.h encoding_binary.c \
 container_of.h queue.h encoding.h
 libavro_la_LIBADD = $(top_builddir)/jansson/src/.libs/libjansson.a

Modified: hadoop/avro/trunk/lang/c/src/avro.h
URL: 
http://svn.apache.org/viewvc/hadoop/avro/trunk/lang/c/src/avro.h?rev=903941&r1=903940&r2=903941&view=diff
==============================================================================
--- hadoop/avro/trunk/lang/c/src/avro.h (original)
+++ hadoop/avro/trunk/lang/c/src/avro.h Thu Jan 28 02:06:31 2010
@@ -255,6 +255,7 @@
 int avro_read_data(avro_reader_t reader,
                   avro_schema_t writer_schema,
                   avro_schema_t reader_schema, avro_datum_t * datum);
+int avro_skip_data(avro_reader_t reader, avro_schema_t writer_schema);
 int avro_write_data(avro_writer_t writer,
                    avro_schema_t writer_schema, avro_datum_t datum);
 

Modified: hadoop/avro/trunk/lang/c/src/datum_read.c
URL: 
http://svn.apache.org/viewvc/hadoop/avro/trunk/lang/c/src/datum_read.c?rev=903941&r1=903940&r2=903941&view=diff
==============================================================================
--- hadoop/avro/trunk/lang/c/src/datum_read.c (original)
+++ hadoop/avro/trunk/lang/c/src/datum_read.c Thu Jan 28 02:06:31 2010
@@ -285,8 +285,10 @@
                        }
                        avro_datum_decref(field_datum);
                } else {
-                       /* TODO: skip_record */
-                       return -1;
+                       rval = avro_skip_data(reader, field->type);
+                       if (rval) {
+                               return rval;
+                       }
                }
        }
        return 0;

Added: hadoop/avro/trunk/lang/c/src/datum_skip.c
URL: 
http://svn.apache.org/viewvc/hadoop/avro/trunk/lang/c/src/datum_skip.c?rev=903941&view=auto
==============================================================================
--- hadoop/avro/trunk/lang/c/src/datum_skip.c (added)
+++ hadoop/avro/trunk/lang/c/src/datum_skip.c Thu Jan 28 02:06:31 2010
@@ -0,0 +1,221 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0 
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License. 
+ */
+#include <stdlib.h>
+#include <errno.h>
+#include <string.h>
+#include "encoding.h"
+#include "schema.h"
+
+static int skip_array(avro_reader_t reader, const avro_encoding_t * enc,
+                     struct avro_array_schema_t *writers_schema)
+{
+       int rval;
+       int64_t i;
+       int64_t block_count;
+       int64_t block_size;
+
+       rval = enc->read_long(reader, &block_count);
+       if (rval) {
+               return rval;
+       }
+
+       while (block_count != 0) {
+               if (block_count < 0) {
+                       block_count = block_count * -1;
+                       rval = enc->read_long(reader, &block_size);
+                       if (rval) {
+                               return rval;
+                       }
+               }
+
+               for (i = 0; i < block_count; i++) {
+                       rval = avro_skip_data(reader, writers_schema->items);
+                       if (rval) {
+                               return rval;
+                       }
+               }
+
+               rval = enc->read_long(reader, &block_count);
+               if (rval) {
+                       return rval;
+               }
+       }
+       return 0;
+}
+
+static int skip_map(avro_reader_t reader, const avro_encoding_t * enc,
+                   struct avro_map_schema_t *writers_schema)
+{
+       int rval;
+       int64_t i, block_count;
+
+       rval = enc->read_long(reader, &block_count);
+       if (rval) {
+               return rval;
+       }
+       while (block_count != 0) {
+               int64_t block_size;
+               if (block_count < 0) {
+                       block_count = block_count * -1;
+                       rval = enc->read_long(reader, &block_size);
+                       if (rval) {
+                               return rval;
+                       }
+               }
+               for (i = 0; i < block_count; i++) {
+                       rval = enc->skip_string(reader);
+                       if (rval) {
+                               return rval;
+                       }
+                       rval =
+                           avro_skip_data(reader,
+                                          avro_schema_to_map(writers_schema)->
+                                          values);
+                       if (rval) {
+                               return rval;
+                       }
+               }
+               rval = enc->read_long(reader, &block_count);
+               if (rval) {
+                       return rval;
+               }
+       }
+       return 0;
+}
+
+static int skip_union(avro_reader_t reader, const avro_encoding_t * enc,
+                     struct avro_union_schema_t *writers_schema)
+{
+       int rval;
+       int64_t i, index;
+       struct avro_union_branch_t *branch;
+
+       rval = enc->read_long(reader, &index);
+       if (rval) {
+               return rval;
+       }
+
+       branch = STAILQ_FIRST(&writers_schema->branches);
+       for (i = 0; i != index && branch != NULL;
+            branch = STAILQ_NEXT(branch, branches)) {
+       }
+       if (!branch) {
+               return EILSEQ;
+       }
+       return avro_skip_data(reader, branch->schema);
+}
+
+static int skip_record(avro_reader_t reader, const avro_encoding_t * enc,
+                      struct avro_record_schema_t *writers_schema)
+{
+       int rval;
+       struct avro_record_field_t *field;
+
+       for (field = STAILQ_FIRST(&writers_schema->fields);
+            field != NULL; field = STAILQ_NEXT(field, fields)) {
+               rval = avro_skip_data(reader, field->type);
+               if (rval) {
+                       return rval;
+               }
+       }
+       return 0;
+}
+
+int avro_skip_data(avro_reader_t reader, avro_schema_t writers_schema)
+{
+       int rval = EINVAL;
+       const avro_encoding_t *enc = &avro_binary_encoding;
+
+       if (!reader || !is_avro_schema(writers_schema)) {
+               return EINVAL;
+       }
+
+       switch (avro_typeof(writers_schema)) {
+       case AVRO_NULL:
+               rval = enc->skip_null(reader);
+               break;
+
+       case AVRO_BOOLEAN:
+               rval = enc->skip_boolean(reader);
+               break;
+
+       case AVRO_STRING:
+               rval = enc->skip_string(reader);
+               break;
+
+       case AVRO_INT32:
+               rval = enc->skip_int(reader);
+               break;
+
+       case AVRO_INT64:
+               rval = enc->skip_long(reader);
+               break;
+
+       case AVRO_FLOAT:
+               rval = enc->skip_float(reader);
+               break;
+
+       case AVRO_DOUBLE:
+               rval = enc->skip_double(reader);
+               break;
+
+       case AVRO_BYTES:
+               rval = enc->skip_bytes(reader);
+               break;
+
+       case AVRO_FIXED:
+               rval =
+                   avro_skip(reader,
+                             avro_schema_to_fixed(writers_schema)->size);
+               break;
+
+       case AVRO_ENUM:
+               rval = enc->skip_long(reader);
+               break;
+
+       case AVRO_ARRAY:
+               rval =
+                   skip_array(reader, enc,
+                              avro_schema_to_array(writers_schema));
+               break;
+
+       case AVRO_MAP:
+               rval =
+                   skip_map(reader, enc, avro_schema_to_map(writers_schema));
+               break;
+
+       case AVRO_UNION:
+               rval =
+                   skip_union(reader, enc,
+                              avro_schema_to_union(writers_schema));
+               break;
+
+       case AVRO_RECORD:
+               rval =
+                   skip_record(reader, enc,
+                               avro_schema_to_record(writers_schema));
+               break;
+
+       case AVRO_LINK:
+               rval =
+                   avro_skip_data(reader,
+                                  (avro_schema_to_link(writers_schema))->to);
+               break;
+       }
+
+       return rval;
+}

Modified: hadoop/avro/trunk/lang/c/src/io.c
URL: 
http://svn.apache.org/viewvc/hadoop/avro/trunk/lang/c/src/io.c?rev=903941&r1=903940&r2=903941&view=diff
==============================================================================
--- hadoop/avro/trunk/lang/c/src/io.c (original)
+++ hadoop/avro/trunk/lang/c/src/io.c Thu Jan 28 02:06:31 2010
@@ -139,7 +139,7 @@
 static int
 avro_read_memory(struct avro_memory_reader_t *reader, void *buf, int64_t len)
 {
-       if (len) {
+       if (len > 0) {
                if ((reader->len - reader->read) < len) {
                        return ENOSPC;
                }
@@ -174,6 +174,42 @@
        return EINVAL;
 }
 
+static int avro_skip_memory(struct avro_memory_reader_t *reader, int64_t len)
+{
+       if (len > 0) {
+               if ((reader->len - reader->read) < len) {
+                       return ENOSPC;
+               }
+               reader->read += len;
+       }
+       return 0;
+}
+
+static int avro_skip_file(struct avro_file_reader_t *reader, int64_t len)
+{
+       int rval;
+       if (len > 0) {
+               rval = fseek(reader->fp, len, SEEK_CUR);
+               if (rval < 0) {
+                       return rval;
+               }
+       }
+       return 0;
+}
+
+int avro_skip(avro_reader_t reader, int64_t len)
+{
+       if (len >= 0) {
+               if (is_memory_io(reader)) {
+                       return avro_skip_memory(avro_reader_to_memory(reader),
+                                               len);
+               } else if (is_file_io(reader)) {
+                       return avro_skip_file(avro_reader_to_file(reader), len);
+               }
+       }
+       return 0;
+}
+
 static int
 avro_write_memory(struct avro_memory_writer_t *writer, void *buf, int64_t len)
 {
@@ -206,7 +242,7 @@
                if (is_memory_io(writer)) {
                        return avro_write_memory(avro_writer_to_memory(writer),
                                                 buf, len);
-               } else if (is_memory_io(writer)) {
+               } else if (is_file_io(writer)) {
                        return avro_write_file(avro_writer_to_file(writer), buf,
                                               len);
                }
@@ -214,14 +250,6 @@
        return EINVAL;
 }
 
-int avro_skip(avro_reader_t reader, int64_t len)
-{
-       /*
-        * TODO 
-        */
-       return -1;
-}
-
 void avro_writer_dump(avro_writer_t writer, FILE * fp)
 {
        if (is_memory_io(writer)) {

Modified: hadoop/avro/trunk/lang/c/tests/Makefile.am
URL: 
http://svn.apache.org/viewvc/hadoop/avro/trunk/lang/c/tests/Makefile.am?rev=903941&r1=903940&r2=903941&view=diff
==============================================================================
--- hadoop/avro/trunk/lang/c/tests/Makefile.am (original)
+++ hadoop/avro/trunk/lang/c/tests/Makefile.am Thu Jan 28 02:06:31 2010
@@ -15,7 +15,4 @@
 test_avro_data_SOURCES=test_avro_data.c
 test_avro_data_LDADD=$(test_LDADD)
 
-test_avro_interop_SOURCES=test_avro_interop.c
-test_avro_interop_LDADD=$(test_LDADD)
-
 TESTS=$(check_PROGRAMS) test_valgrind

Modified: hadoop/avro/trunk/lang/c/tests/test_valgrind
URL: 
http://svn.apache.org/viewvc/hadoop/avro/trunk/lang/c/tests/test_valgrind?rev=903941&r1=903940&r2=903941&view=diff
==============================================================================
--- hadoop/avro/trunk/lang/c/tests/test_valgrind (original)
+++ hadoop/avro/trunk/lang/c/tests/test_valgrind Thu Jan 28 02:06:31 2010
@@ -22,8 +22,7 @@
        exit 77
 fi
 
-LD_LIBRARY_PATH="../src/.libs/:${LD_LIBRARY_PATH}"
-valgrind --leak-check=full --show-reachable=yes -q .libs/test_avro_data 2>&1 |\
+LD_LIBRARY_PATH="../src/.libs/" valgrind --leak-check=full 
--show-reachable=yes -q .libs/test_avro_data 2>&1 |\
 grep -E '^==[0-9]+== '
 if [ $? -eq 0 ]; then
        # Expression found. Test failed.

Modified: hadoop/avro/trunk/lang/c/version.sh
URL: 
http://svn.apache.org/viewvc/hadoop/avro/trunk/lang/c/version.sh?rev=903941&r1=903940&r2=903941&view=diff
==============================================================================
--- hadoop/avro/trunk/lang/c/version.sh (original)
+++ hadoop/avro/trunk/lang/c/version.sh Thu Jan 28 02:06:31 2010
@@ -18,7 +18,7 @@
 #         libavro_binary_age = 0
 #         libavro_interface_age = 0
 #
-libavro_micro_version=14
+libavro_micro_version=15
 libavro_interface_age=0
 libavro_binary_age=0
 


Reply via email to