Author: dcreager
Date: Fri Feb 25 16:22:55 2011
New Revision: 1074618

URL: http://svn.apache.org/viewvc?rev=1074618&view=rev
Log:
AVRO-762. C: Recursive schema resolution

The avro_resolver_new function now works correctly with recursive
schemas.  We had to memoize the results of avro_resolver_new, so that we
can detect when we've already created an avro_resolver_t instance for a
pair of schemas.  This prevents us from going into an infinite loop
trying to resolve a recursive schema.

This memoization means that there might be multiple references to a
resolver within the graph of resolvers for a recursive schema type.
When freeing this graph of resolvers, we now have to take care to only
free each instance once.  We do this by maintaining a set of the
resolvers that we've already encountered during the free operation, and
immediately returning if we try to start freeing a resolver object a
second time.

Modified:
    avro/trunk/lang/c/src/avro.h
    avro/trunk/lang/c/src/consumer.c
    avro/trunk/lang/c/src/resolver.c
    avro/trunk/lang/c/src/schema.c
    avro/trunk/lang/c/tests/test_avro_data.c

Modified: avro/trunk/lang/c/src/avro.h
URL: 
http://svn.apache.org/viewvc/avro/trunk/lang/c/src/avro.h?rev=1074618&r1=1074617&r2=1074618&view=diff
==============================================================================
--- avro/trunk/lang/c/src/avro.h (original)
+++ avro/trunk/lang/c/src/avro.h Fri Feb 25 16:22:55 2011
@@ -177,6 +177,7 @@ avro_schema_t avro_schema_union_branch_b
 (avro_schema_t union_schema, int *branch_index, const char *name);
 
 avro_schema_t avro_schema_link(avro_schema_t schema);
+avro_schema_t avro_schema_link_target(avro_schema_t schema);
 
 typedef struct avro_schema_error_t_ *avro_schema_error_t;
 int avro_schema_from_json(const char *jsontext,

Modified: avro/trunk/lang/c/src/consumer.c
URL: 
http://svn.apache.org/viewvc/avro/trunk/lang/c/src/consumer.c?rev=1074618&r1=1074617&r2=1074618&view=diff
==============================================================================
--- avro/trunk/lang/c/src/consumer.c (original)
+++ avro/trunk/lang/c/src/consumer.c Fri Feb 25 16:22:55 2011
@@ -19,7 +19,5 @@
 
 void avro_consumer_free(avro_consumer_t *consumer)
 {
-       if (consumer->free) {
-               consumer->free(consumer);
-       }
+       consumer->free(consumer);
 }

Modified: avro/trunk/lang/c/src/resolver.c
URL: 
http://svn.apache.org/viewvc/avro/trunk/lang/c/src/resolver.c?rev=1074618&r1=1074617&r2=1074618&view=diff
==============================================================================
--- avro/trunk/lang/c/src/resolver.c (original)
+++ avro/trunk/lang/c/src/resolver.c Fri Feb 25 16:22:55 2011
@@ -24,6 +24,7 @@
 #include "avro_errors.h"
 #include "avro_private.h"
 #include "allocation.h"
+#include "st.h"
 
 
 #if !defined(DEBUG_RESOLVER)
@@ -66,10 +67,31 @@ struct avro_resolver_t {
 };
 
 
+/**
+ * Frees a resolver object, while ensuring that all of the resolvers in
+ * a graph of resolvers is only freed once.
+ */
+
 static void
-avro_resolver_free(avro_consumer_t *consumer)
+avro_resolver_free_cycles(avro_consumer_t *consumer, st_table *freeing)
 {
        avro_resolver_t  *resolver = (avro_resolver_t *) consumer;
+
+       /*
+        * First check if we've already started freeing this resolver.
+        */
+
+       if (st_lookup(freeing, (st_data_t) resolver, NULL)) {
+               return;
+       }
+
+       /*
+        * Otherwise add this resolver to the freeing set, and then
+        * actually free the thing.
+        */
+
+       st_insert(freeing, (st_data_t) resolver, (st_data_t) NULL);
+
        avro_schema_decref(resolver->parent.schema);
        avro_schema_decref(resolver->rschema);
        if (resolver->child_resolvers) {
@@ -77,7 +99,7 @@ avro_resolver_free(avro_consumer_t *cons
                for (i = 0; i < resolver->num_children; i++) {
                        avro_consumer_t  *child = resolver->child_resolvers[i];
                        if (child) {
-                               avro_consumer_free(child);
+                               avro_resolver_free_cycles(child, freeing);
                        }
                }
                avro_free(resolver->child_resolvers,
@@ -91,6 +113,14 @@ avro_resolver_free(avro_consumer_t *cons
 }
 
 
+static void
+avro_resolver_free(avro_consumer_t *consumer)
+{
+       st_table  *freeing = st_init_numtable();
+       avro_resolver_free_cycles(consumer, freeing);
+       st_free_table(freeing);
+}
+
 /**
  * Create a new avro_resolver_t instance.  You must fill in the callback
  * pointers that are appropriate for the writer schema after this
@@ -135,6 +165,68 @@ avro_resolver_get_real_dest(avro_resolve
 }
 
 
+#define skip_links(schema)                                     \
+       while (is_avro_link(schema)) {                          \
+               schema = avro_schema_link_target(schema);       \
+       }
+
+
+/*-----------------------------------------------------------------------
+ * Memoized resolvers
+ */
+
+static int
+avro_resolver_cmp(avro_resolver_t *a, avro_resolver_t *b)
+{
+       return (a->parent.schema != b->parent.schema) || (a->rschema != 
b->rschema);
+}
+
+static int
+avro_resolver_hash(avro_resolver_t *a)
+{
+       return ((uintptr_t) a->parent.schema) ^ ((uintptr_t) a->rschema);
+}
+
+static struct st_hash_type  avro_resolver_hash_type = {
+       avro_resolver_cmp,
+       avro_resolver_hash
+};
+
+static void
+save_resolver(st_table *resolvers, avro_resolver_t *resolver)
+{
+       st_insert(resolvers, (st_data_t) resolver, (st_data_t) resolver);
+}
+
+static void
+delete_resolver(st_table *resolvers, avro_resolver_t *resolver)
+{
+       st_delete(resolvers, (st_data_t *) &resolver, (st_data_t *) &resolver);
+}
+
+static avro_resolver_t *
+find_resolver(st_table *resolvers, avro_schema_t wschema, avro_schema_t 
rschema)
+{
+       avro_resolver_t  dummy;
+       dummy.parent.schema = wschema;
+       dummy.rschema = rschema;
+       union {
+               st_data_t  data;
+               avro_resolver_t  *resolver;
+       } val;
+       if (st_lookup(resolvers, (st_data_t) &dummy, &val.data)) {
+               return val.resolver;
+       } else {
+               return NULL;
+       }
+}
+
+
+static avro_consumer_t *
+avro_resolver_new_memoized(st_table *resolvers,
+                          avro_schema_t wschema, avro_schema_t rschema);
+
+
 /*-----------------------------------------------------------------------
  * Reader unions
  */
@@ -153,10 +245,10 @@ avro_resolver_get_real_dest(avro_resolve
  * compatible.
  */
 
-#define check_non_union(wschema, rschema, check_func)          \
+#define check_non_union(saved, wschema, rschema, check_func)   \
 do {                                                           \
        avro_resolver_t  *self = NULL;                          \
-       int  rc = check_func(&self, wschema, rschema,           \
+       int  rc = check_func(saved, &self, wschema, rschema,    \
                             rschema);                          \
        if (self) {                                             \
                debug("Non-union schemas %s (writer) "          \
@@ -182,7 +274,7 @@ do {                                                        
        \
  * check the compatiblity of each branch schema.
  */
 
-#define check_reader_union(wschema, rschema, check_func)               \
+#define check_reader_union(saved, wschema, rschema, check_func)                
\
 do {                                                                   \
        if (!is_avro_union(rschema)) {                                  \
                break;                                                  \
@@ -195,8 +287,10 @@ do {                                                       
                \
        for (i = 0; i < num_branches; i++) {                            \
                avro_schema_t  branch_schema =                          \
                    avro_schema_union_branch(rschema, i);               \
+               skip_links(branch_schema);                              \
                avro_resolver_t  *self = NULL;                          \
-               int  rc = check_func(&self, wschema, branch_schema,     \
+               int  rc = check_func(saved, &self,                      \
+                                    wschema, branch_schema,            \
                                     rschema);                          \
                if (self) {                                             \
                        debug("Reader union branch %d (%s) "            \
@@ -205,6 +299,10 @@ do {                                                       
                \
                              avro_schema_type_name(wschema));          \
                        self->reader_union_branch = i;                  \
                        return &self->parent;                           \
+               } else {                                                \
+                       debug("Reader union branch %d (%s) "            \
+                             "doesn't match",                          \
+                             i, avro_schema_type_name(branch_schema)); \
                }                                                       \
                                                                        \
                if (rc) {                                               \
@@ -220,10 +318,10 @@ do {                                                      
                \
  * check_reader_union for a simple (non-union) writer schema type.
  */
 
-#define check_simple_writer(wschema, rschema, type_name)               \
+#define check_simple_writer(saved, wschema, rschema, type_name)                
\
 do {                                                                   \
-       check_non_union(wschema, rschema, try_##type_name);             \
-       check_reader_union(wschema, rschema, try_##type_name);          \
+       check_non_union(saved, wschema, rschema, try_##type_name);      \
+       check_reader_union(saved, wschema, rschema, try_##type_name);   \
        debug("Writer %s doesn't match reader %s",                      \
              avro_schema_type_name(wschema),                           \
              avro_schema_type_name(rschema));                          \
@@ -249,12 +347,13 @@ avro_resolver_boolean_value(avro_consume
 }
 
 static int
-try_boolean(avro_resolver_t **resolver,
+try_boolean(st_table *resolvers, avro_resolver_t **resolver,
            avro_schema_t wschema, avro_schema_t rschema,
            avro_schema_t root_rschema)
 {
        if (is_avro_boolean(rschema)) {
                *resolver = avro_resolver_create(wschema, root_rschema);
+               save_resolver(resolvers, *resolver);
                (*resolver)->parent.boolean_value = avro_resolver_boolean_value;
        }
        return 0;
@@ -284,12 +383,13 @@ avro_resolver_bytes_value(avro_consumer_
 }
 
 static int
-try_bytes(avro_resolver_t **resolver,
+try_bytes(st_table *resolvers, avro_resolver_t **resolver,
          avro_schema_t wschema, avro_schema_t rschema,
          avro_schema_t root_rschema)
 {
        if (is_avro_bytes(rschema)) {
                *resolver = avro_resolver_create(wschema, root_rschema);
+               save_resolver(resolvers, *resolver);
                (*resolver)->parent.bytes_value = avro_resolver_bytes_value;
        }
        return 0;
@@ -308,12 +408,13 @@ avro_resolver_double_value(avro_consumer
 }
 
 static int
-try_double(avro_resolver_t **resolver,
+try_double(st_table *resolvers, avro_resolver_t **resolver,
           avro_schema_t wschema, avro_schema_t rschema,
           avro_schema_t root_rschema)
 {
        if (is_avro_double(rschema)) {
                *resolver = avro_resolver_create(wschema, root_rschema);
+               save_resolver(resolvers, *resolver);
                (*resolver)->parent.double_value = avro_resolver_double_value;
        }
        return 0;
@@ -343,16 +444,18 @@ avro_resolver_float_double_value(avro_co
 }
 
 static int
-try_float(avro_resolver_t **resolver,
+try_float(st_table *resolvers, avro_resolver_t **resolver,
          avro_schema_t wschema, avro_schema_t rschema,
          avro_schema_t root_rschema)
 {
        if (is_avro_float(rschema)) {
                *resolver = avro_resolver_create(wschema, root_rschema);
+               save_resolver(resolvers, *resolver);
                (*resolver)->parent.float_value = avro_resolver_float_value;
        }
        else if (is_avro_double(rschema)) {
                *resolver = avro_resolver_create(wschema, root_rschema);
+               save_resolver(resolvers, *resolver);
                (*resolver)->parent.float_value = 
avro_resolver_float_double_value;
        }
        return 0;
@@ -404,24 +507,28 @@ avro_resolver_int_float_value(avro_consu
 }
 
 static int
-try_int(avro_resolver_t **resolver,
+try_int(st_table *resolvers, avro_resolver_t **resolver,
        avro_schema_t wschema, avro_schema_t rschema,
        avro_schema_t root_rschema)
 {
        if (is_avro_int32(rschema)) {
                *resolver = avro_resolver_create(wschema, root_rschema);
+               save_resolver(resolvers, *resolver);
                (*resolver)->parent.int_value = avro_resolver_int_value;
        }
        else if (is_avro_int64(rschema)) {
                *resolver = avro_resolver_create(wschema, root_rschema);
+               save_resolver(resolvers, *resolver);
                (*resolver)->parent.int_value = avro_resolver_int_long_value;
        }
        else if (is_avro_double(rschema)) {
                *resolver = avro_resolver_create(wschema, root_rschema);
+               save_resolver(resolvers, *resolver);
                (*resolver)->parent.int_value = avro_resolver_int_double_value;
        }
        else if (is_avro_float(rschema)) {
                *resolver = avro_resolver_create(wschema, root_rschema);
+               save_resolver(resolvers, *resolver);
                (*resolver)->parent.int_value = avro_resolver_int_float_value;
        }
        return 0;
@@ -462,20 +569,23 @@ avro_resolver_long_double_value(avro_con
 }
 
 static int
-try_long(avro_resolver_t **resolver,
+try_long(st_table *resolvers, avro_resolver_t **resolver,
         avro_schema_t wschema, avro_schema_t rschema,
         avro_schema_t root_rschema)
 {
        if (is_avro_int64(rschema)) {
                *resolver = avro_resolver_create(wschema, root_rschema);
+               save_resolver(resolvers, *resolver);
                (*resolver)->parent.long_value = avro_resolver_long_value;
        }
        else if (is_avro_double(rschema)) {
                *resolver = avro_resolver_create(wschema, root_rschema);
+               save_resolver(resolvers, *resolver);
                (*resolver)->parent.long_value = 
avro_resolver_long_double_value;
        }
        else if (is_avro_float(rschema)) {
                *resolver = avro_resolver_create(wschema, root_rschema);
+               save_resolver(resolvers, *resolver);
                (*resolver)->parent.long_value = avro_resolver_long_float_value;
        }
        return 0;
@@ -485,25 +595,23 @@ try_long(avro_resolver_t **resolver,
 static int
 avro_resolver_null_value(avro_consumer_t *consumer, void *user_data)
 {
-#if DEBUG_RESOLVER
        avro_resolver_t  *resolver = (avro_resolver_t *) consumer;
        avro_datum_t  ud_dest = user_data;
        avro_datum_t  dest = avro_resolver_get_real_dest(resolver, ud_dest);
+
+       AVRO_UNUSED(dest);
        debug("Storing null into %p", dest);
-#else
-       AVRO_UNUSED(consumer);
-       AVRO_UNUSED(user_data);
-#endif
        return 0;
 }
 
 static int
-try_null(avro_resolver_t **resolver,
+try_null(st_table *resolvers, avro_resolver_t **resolver,
         avro_schema_t wschema, avro_schema_t rschema,
         avro_schema_t root_rschema)
 {
        if (is_avro_null(rschema)) {
                *resolver = avro_resolver_create(wschema, root_rschema);
+               save_resolver(resolvers, *resolver);
                (*resolver)->parent.null_value = avro_resolver_null_value;
        }
        return 0;
@@ -524,12 +632,13 @@ avro_resolver_string_value(avro_consumer
 }
 
 static int
-try_string(avro_resolver_t **resolver,
+try_string(st_table *resolvers, avro_resolver_t **resolver,
           avro_schema_t wschema, avro_schema_t rschema,
           avro_schema_t root_rschema)
 {
        if (is_avro_string(rschema)) {
                *resolver = avro_resolver_create(wschema, root_rschema);
+               save_resolver(resolvers, *resolver);
                (*resolver)->parent.string_value = avro_resolver_string_value;
        }
        return 0;
@@ -546,18 +655,14 @@ avro_resolver_array_start_block(avro_con
                                unsigned int block_count,
                                void *user_data)
 {
-#if DEBUG_RESOLVER
        if (is_first_block) {
                avro_resolver_t  *resolver = (avro_resolver_t *) consumer;
                avro_datum_t  ud_dest = user_data;
                avro_datum_t  dest = avro_resolver_get_real_dest(resolver, 
ud_dest);
+               AVRO_UNUSED(dest);
+
                debug("Starting array %p", dest);
        }
-#else
-       AVRO_UNUSED(consumer);
-       AVRO_UNUSED(user_data);
-       AVRO_UNUSED(is_first_block);
-#endif
 
        AVRO_UNUSED(block_count);
        return 0;
@@ -580,6 +685,7 @@ avro_resolver_array_element(avro_consume
        /*
         * Allocate a new element datum and add it to the array.
         */
+
        avro_schema_t  array_schema = avro_datum_get_schema(dest);
        avro_schema_t  item_schema = avro_schema_array_items(array_schema);
        avro_datum_t  element = avro_datum_from_schema(item_schema);
@@ -590,13 +696,14 @@ avro_resolver_array_element(avro_consume
         * Return the consumer that we allocated to process the array's
         * children.
         */
+
        *element_consumer = resolver->child_resolvers[0];
        *element_user_data = element;
        return 0;
 }
 
 static int
-try_array(avro_resolver_t **resolver,
+try_array(st_table *resolvers, avro_resolver_t **resolver,
          avro_schema_t wschema, avro_schema_t rschema,
          avro_schema_t root_rschema)
 {
@@ -614,11 +721,17 @@ try_array(avro_resolver_t **resolver,
         * check the compatibility.
         */
 
+       *resolver = avro_resolver_create(wschema, root_rschema);
+       save_resolver(resolvers, *resolver);
+
        avro_schema_t  witems = avro_schema_array_items(wschema);
        avro_schema_t  ritems = avro_schema_array_items(rschema);
 
-       avro_consumer_t  *item_consumer = avro_resolver_new(witems, ritems);
+       avro_consumer_t  *item_consumer =
+           avro_resolver_new_memoized(resolvers, witems, ritems);
        if (!item_consumer) {
+               delete_resolver(resolvers, *resolver);
+               avro_consumer_free(&(*resolver)->parent);
                avro_prefix_error("Array values aren't compatible: ");
                return EINVAL;
        }
@@ -629,7 +742,6 @@ try_array(avro_resolver_t **resolver,
         * resolver into the child_resolvers field.
         */
 
-       *resolver = avro_resolver_create(wschema, root_rschema);
        (*resolver)->num_children = 1;
        (*resolver)->child_resolvers = avro_calloc(1, sizeof(avro_consumer_t 
*));
        (*resolver)->child_resolvers[0] = item_consumer;
@@ -660,7 +772,7 @@ avro_resolver_enum_value(avro_consumer_t
 }
 
 static int
-try_enum(avro_resolver_t **resolver,
+try_enum(st_table *resolvers, avro_resolver_t **resolver,
         avro_schema_t wschema, avro_schema_t rschema,
         avro_schema_t root_rschema)
 {
@@ -675,6 +787,7 @@ try_enum(avro_resolver_t **resolver,
 
                if (!strcmp(wname, rname)) {
                        *resolver = avro_resolver_create(wschema, root_rschema);
+                       save_resolver(resolvers, *resolver);
                        (*resolver)->parent.enum_value = 
avro_resolver_enum_value;
                }
        }
@@ -699,7 +812,7 @@ avro_resolver_fixed_value(avro_consumer_
 }
 
 static int
-try_fixed(avro_resolver_t **resolver,
+try_fixed(st_table *resolvers, avro_resolver_t **resolver,
          avro_schema_t wschema, avro_schema_t rschema,
          avro_schema_t root_rschema)
 {
@@ -709,6 +822,7 @@ try_fixed(avro_resolver_t **resolver,
 
        if (avro_schema_equal(wschema, rschema)) {
                *resolver = avro_resolver_create(wschema, root_rschema);
+               save_resolver(resolvers, *resolver);
                (*resolver)->parent.fixed_value = avro_resolver_fixed_value;
        }
        return 0;
@@ -725,18 +839,14 @@ avro_resolver_map_start_block(avro_consu
                              unsigned int block_count,
                              void *user_data)
 {
-#if DEBUG_RESOLVER
        if (is_first_block) {
                avro_resolver_t  *resolver = (avro_resolver_t *) consumer;
                avro_datum_t  ud_dest = user_data;
                avro_datum_t  dest = avro_resolver_get_real_dest(resolver, 
ud_dest);
+               AVRO_UNUSED(dest);
+
                debug("Starting map %p", dest);
        }
-#else
-       AVRO_UNUSED(consumer);
-       AVRO_UNUSED(user_data);
-       AVRO_UNUSED(is_first_block);
-#endif
 
        AVRO_UNUSED(block_count);
        return 0;
@@ -778,7 +888,7 @@ avro_resolver_map_element(avro_consumer_
 }
 
 static int
-try_map(avro_resolver_t **resolver,
+try_map(st_table *resolvers, avro_resolver_t **resolver,
        avro_schema_t wschema, avro_schema_t rschema,
        avro_schema_t root_rschema)
 {
@@ -796,11 +906,17 @@ try_map(avro_resolver_t **resolver,
         * check the compatibility.
         */
 
+       *resolver = avro_resolver_create(wschema, root_rschema);
+       save_resolver(resolvers, *resolver);
+
        avro_schema_t  wvalues = avro_schema_map_values(wschema);
        avro_schema_t  rvalues = avro_schema_map_values(rschema);
 
-       avro_consumer_t  *value_consumer = avro_resolver_new(wvalues, rvalues);
+       avro_consumer_t  *value_consumer =
+           avro_resolver_new_memoized(resolvers, wvalues, rvalues);
        if (!value_consumer) {
+               delete_resolver(resolvers, *resolver);
+               avro_consumer_free(&(*resolver)->parent);
                avro_prefix_error("Map values aren't compatible: ");
                return EINVAL;
        }
@@ -811,7 +927,6 @@ try_map(avro_resolver_t **resolver,
         * resolver into the child_resolvers field.
         */
 
-       *resolver = avro_resolver_create(wschema, root_rschema);
        (*resolver)->num_children = 1;
        (*resolver)->child_resolvers = avro_calloc(1, sizeof(avro_consumer_t 
*));
        (*resolver)->child_resolvers[0] = value_consumer;
@@ -830,15 +945,12 @@ static int
 avro_resolver_record_start(avro_consumer_t *consumer,
                           void *user_data)
 {
-#if DEBUG_RESOLVER
        avro_resolver_t  *resolver = (avro_resolver_t *) consumer;
        avro_datum_t  ud_dest = user_data;
        avro_datum_t  dest = avro_resolver_get_real_dest(resolver, ud_dest);
+       AVRO_UNUSED(dest);
+
        debug("Starting record at %p", dest);
-#else
-       AVRO_UNUSED(consumer);
-       AVRO_UNUSED(user_data);
-#endif
 
        /*
         * TODO: Eventually, we'll fill in default values for the extra
@@ -890,7 +1002,7 @@ avro_resolver_record_field(avro_consumer
 }
 
 static int
-try_record(avro_resolver_t **resolver,
+try_record(st_table *resolvers, avro_resolver_t **resolver,
           avro_schema_t wschema, avro_schema_t rschema,
           avro_schema_t root_rschema)
 {
@@ -929,6 +1041,9 @@ try_record(avro_resolver_t **resolver,
         * skipped when processing the input.
         */
 
+       *resolver = avro_resolver_create(wschema, root_rschema);
+       save_resolver(resolvers, *resolver);
+
        size_t  wfields = avro_schema_record_size(wschema);
        size_t  rfields = avro_schema_record_size(rschema);
 
@@ -973,7 +1088,8 @@ try_record(avro_resolver_t **resolver,
 
                avro_schema_t  wfield =
                    avro_schema_record_field_get_by_index(wschema, wi);
-               avro_consumer_t  *field_resolver = avro_resolver_new(wfield, 
rfield);
+               avro_consumer_t  *field_resolver =
+                   avro_resolver_new_memoized(resolvers, wfield, rfield);
 
                if (!field_resolver) {
                        avro_prefix_error("Field %s isn't compatible: ", 
field_name);
@@ -995,7 +1111,6 @@ try_record(avro_resolver_t **resolver,
         * but that's okay — any extras will be ignored.
         */
 
-       *resolver = avro_resolver_create(wschema, root_rschema);
        (*resolver)->num_children = wfields;
        (*resolver)->child_resolvers = child_resolvers;
        (*resolver)->index_mapping = index_mapping;
@@ -1008,6 +1123,9 @@ error:
         * Clean up any consumer we might have already created.
         */
 
+       delete_resolver(resolvers, *resolver);
+       avro_consumer_free(&(*resolver)->parent);
+
        {
                unsigned int  i;
                for (i = 0; i < wfields; i++) {
@@ -1060,7 +1178,7 @@ avro_resolver_union_branch(avro_consumer
 }
 
 static avro_consumer_t *
-try_union(avro_schema_t wschema, avro_schema_t rschema)
+try_union(st_table *resolvers, avro_schema_t wschema, avro_schema_t rschema)
 {
        /*
         * For a writer union, we recursively try to resolve each branch
@@ -1085,6 +1203,9 @@ try_union(avro_schema_t wschema, avro_sc
        size_t  num_branches = avro_schema_union_size(wschema);
        debug("Checking %zu-branch writer union schema", num_branches);
 
+       avro_resolver_t  *resolver = avro_resolver_create(wschema, rschema);
+       save_resolver(resolvers, resolver);
+
        avro_consumer_t  **child_resolvers =
            avro_calloc(num_branches, sizeof(avro_consumer_t *));
        int  some_branch_compatible = 0;
@@ -1105,7 +1226,8 @@ try_union(avro_schema_t wschema, avro_sc
                 * appear in the input.
                 */
 
-               child_resolvers[i] = avro_resolver_new(branch_schema, rschema);
+               child_resolvers[i] =
+                   avro_resolver_new_memoized(resolvers, branch_schema, 
rschema);
                if (child_resolvers[i]) {
                        debug("Found match for writer union branch %u", i);
                        some_branch_compatible = 1;
@@ -1128,7 +1250,6 @@ try_union(avro_schema_t wschema, avro_sc
                goto error;
        }
 
-       avro_resolver_t  *resolver = avro_resolver_create(wschema, rschema);
        resolver->num_children = num_branches;
        resolver->child_resolvers = child_resolvers;
        resolver->parent.union_branch = avro_resolver_union_branch;
@@ -1139,6 +1260,9 @@ error:
         * Clean up any consumer we might have already created.
         */
 
+       delete_resolver(resolvers, resolver);
+       avro_consumer_free(&resolver->parent);
+
        for (i = 0; i < num_branches; i++) {
                if (child_resolvers[i]) {
                        avro_consumer_free(child_resolvers[i]);
@@ -1154,68 +1278,89 @@ error:
  * schema type dispatcher
  */
 
-avro_consumer_t *
-avro_resolver_new(avro_schema_t wschema, avro_schema_t rschema)
+static avro_consumer_t *
+avro_resolver_new_memoized(st_table *resolvers,
+                          avro_schema_t wschema, avro_schema_t rschema)
 {
        check_param(NULL, is_avro_schema(wschema), "writer schema");
        check_param(NULL, is_avro_schema(rschema), "reader schema");
 
+       skip_links(wschema);
+       skip_links(rschema);
+
+       /*
+        * First see if we've already matched these two schemas.  If so,
+        * just return that resolver.
+        */
+
+       avro_resolver_t  *saved = find_resolver(resolvers, wschema, rschema);
+       if (saved) {
+               debug("Already resolved %s and %s",
+                     avro_schema_type_name(wschema),
+                     avro_schema_type_name(rschema));
+               return &saved->parent;
+       }
+
+       /*
+        * Otherwise we have some work to do.
+        */
+
        switch (avro_typeof(wschema))
        {
                case AVRO_BOOLEAN:
-                       check_simple_writer(wschema, rschema, boolean);
+                       check_simple_writer(resolvers, wschema, rschema, 
boolean);
                        return NULL;
 
                case AVRO_BYTES:
-                       check_simple_writer(wschema, rschema, bytes);
+                       check_simple_writer(resolvers, wschema, rschema, bytes);
                        return NULL;
 
                case AVRO_DOUBLE:
-                       check_simple_writer(wschema, rschema, double);
+                       check_simple_writer(resolvers, wschema, rschema, 
double);
                        return NULL;
 
                case AVRO_FLOAT:
-                       check_simple_writer(wschema, rschema, float);
+                       check_simple_writer(resolvers, wschema, rschema, float);
                        return NULL;
 
                case AVRO_INT32:
-                       check_simple_writer(wschema, rschema, int);
+                       check_simple_writer(resolvers, wschema, rschema, int);
                        return NULL;
 
                case AVRO_INT64:
-                       check_simple_writer(wschema, rschema, long);
+                       check_simple_writer(resolvers, wschema, rschema, long);
                        return NULL;
 
                case AVRO_NULL:
-                       check_simple_writer(wschema, rschema, null);
+                       check_simple_writer(resolvers, wschema, rschema, null);
                        return NULL;
 
                case AVRO_STRING:
-                       check_simple_writer(wschema, rschema, string);
+                       check_simple_writer(resolvers, wschema, rschema, 
string);
                        return NULL;
 
                case AVRO_ARRAY:
-                       check_simple_writer(wschema, rschema, array);
+                       check_simple_writer(resolvers, wschema, rschema, array);
                        return NULL;
 
                case AVRO_ENUM:
-                       check_simple_writer(wschema, rschema, enum);
+                       check_simple_writer(resolvers, wschema, rschema, enum);
                        return NULL;
 
                case AVRO_FIXED:
-                       check_simple_writer(wschema, rschema, fixed);
+                       check_simple_writer(resolvers, wschema, rschema, fixed);
                        return NULL;
 
                case AVRO_MAP:
-                       check_simple_writer(wschema, rschema, map);
+                       check_simple_writer(resolvers, wschema, rschema, map);
                        return NULL;
 
                case AVRO_RECORD:
-                       check_simple_writer(wschema, rschema, record);
+                       check_simple_writer(resolvers, wschema, rschema, 
record);
                        return NULL;
 
                case AVRO_UNION:
-                       return try_union(wschema, rschema);
+                       return try_union(resolvers, wschema, rschema);
 
                default:
                        avro_set_error("Unknown schema type");
@@ -1224,3 +1369,14 @@ avro_resolver_new(avro_schema_t wschema,
 
        return NULL;
 }
+
+
+avro_consumer_t *
+avro_resolver_new(avro_schema_t wschema, avro_schema_t rschema)
+{
+       st_table  *resolvers = st_init_table(&avro_resolver_hash_type);
+       avro_consumer_t  *result =
+           avro_resolver_new_memoized(resolvers, wschema, rschema);
+       st_free_table(resolvers);
+       return result;
+}

Modified: avro/trunk/lang/c/src/schema.c
URL: 
http://svn.apache.org/viewvc/avro/trunk/lang/c/src/schema.c?rev=1074618&r1=1074617&r2=1074618&view=diff
==============================================================================
--- avro/trunk/lang/c/src/schema.c (original)
+++ avro/trunk/lang/c/src/schema.c Fri Feb 25 16:22:55 2011
@@ -695,6 +695,15 @@ avro_schema_t avro_schema_link(avro_sche
        return &link->obj;
 }
 
+avro_schema_t avro_schema_link_target(avro_schema_t schema)
+{
+       check_param(NULL, is_avro_schema(schema), "schema");
+       check_param(NULL, is_avro_link(schema), "schema");
+
+       struct avro_link_schema_t *link = avro_schema_to_link(schema);
+       return link->to;
+}
+
 static int
 avro_type_from_json_t(json_t * json, avro_type_t * type,
                      avro_schema_error_t * error, avro_schema_t * named_type)
@@ -1320,37 +1329,40 @@ const char *avro_schema_name(const avro_
 
 const char *avro_schema_type_name(const avro_schema_t schema)
 {
- if (is_avro_record(schema)) {
-   return (avro_schema_to_record(schema))->name;
- } else if (is_avro_enum(schema)) {
-   return (avro_schema_to_enum(schema))->name;
- } else if (is_avro_fixed(schema)) {
-   return (avro_schema_to_fixed(schema))->name;
- } else if (is_avro_union(schema)) {
-   return "union";
- } else if (is_avro_array(schema)) {
-   return "array";
- } else if (is_avro_map(schema)) {
-   return "map";
- } else if (is_avro_int32(schema)) {
-   return "int";
- } else if (is_avro_int64(schema)) {
-   return "long";
- } else if (is_avro_float(schema)) {
-   return "float";
- } else if (is_avro_double(schema)) {
-   return "double";
- } else if (is_avro_boolean(schema)) {
-   return "boolean";
- } else if (is_avro_null(schema)) {
-   return "null";
- } else if (is_avro_string(schema)) {
-   return "string";
- } else if (is_avro_bytes(schema)) {
-   return "bytes";
- }
- avro_set_error("Unknown schema type");
- return NULL;
+       if (is_avro_record(schema)) {
+               return (avro_schema_to_record(schema))->name;
+       } else if (is_avro_enum(schema)) {
+               return (avro_schema_to_enum(schema))->name;
+       } else if (is_avro_fixed(schema)) {
+               return (avro_schema_to_fixed(schema))->name;
+       } else if (is_avro_union(schema)) {
+               return "union";
+       } else if (is_avro_array(schema)) {
+               return "array";
+       } else if (is_avro_map(schema)) {
+               return "map";
+       } else if (is_avro_int32(schema)) {
+               return "int";
+       } else if (is_avro_int64(schema)) {
+               return "long";
+       } else if (is_avro_float(schema)) {
+               return "float";
+       } else if (is_avro_double(schema)) {
+               return "double";
+       } else if (is_avro_boolean(schema)) {
+               return "boolean";
+       } else if (is_avro_null(schema)) {
+               return "null";
+       } else if (is_avro_string(schema)) {
+               return "string";
+       } else if (is_avro_bytes(schema)) {
+               return "bytes";
+       } else if (is_avro_link(schema)) {
+               avro_schema_t  target = avro_schema_link_target(schema);
+               return avro_schema_type_name(target);
+       }
+       avro_set_error("Unknown schema type");
+       return NULL;
 }
 
 avro_datum_t avro_datum_from_schema(const avro_schema_t schema)

Modified: avro/trunk/lang/c/tests/test_avro_data.c
URL: 
http://svn.apache.org/viewvc/avro/trunk/lang/c/tests/test_avro_data.c?rev=1074618&r1=1074617&r2=1074618&view=diff
==============================================================================
--- avro/trunk/lang/c/tests/test_avro_data.c (original)
+++ avro/trunk/lang/c/tests/test_avro_data.c Fri Feb 25 16:22:55 2011
@@ -382,6 +382,48 @@ static int test_record(void)
        return 0;
 }
 
+static int test_nested_record(void)
+{
+       const char  *json =
+               "{"
+               "  \"type\": \"record\","
+               "  \"name\": \"list\","
+               "  \"fields\": ["
+               "    { \"name\": \"x\", \"type\": \"int\" },"
+               "    { \"name\": \"y\", \"type\": \"int\" },"
+               "    { \"name\": \"next\", \"type\": [\"null\",\"list\"]}"
+               "  ]"
+               "}";
+
+       int  rval;
+
+       avro_schema_t schema = NULL;
+       avro_schema_error_t error;
+       avro_schema_from_json(json, strlen(json), &schema, &error);
+
+       avro_datum_t  head = avro_datum_from_schema(schema);
+       avro_record_set_field_value(rval, head, int32, "x", 10);
+       avro_record_set_field_value(rval, head, int32, "y", 10);
+
+       avro_datum_t  next = NULL;
+       avro_datum_t  tail = NULL;
+
+       avro_record_get(head, "next", &next);
+       avro_union_set_discriminant(next, 1, &tail);
+       avro_record_set_field_value(rval, tail, int32, "x", 20);
+       avro_record_set_field_value(rval, tail, int32, "y", 20);
+
+       avro_record_get(tail, "next", &next);
+       avro_union_set_discriminant(next, 0, NULL);
+
+       write_read_check(schema, head, NULL, NULL, "nested record");
+
+       avro_schema_decref(schema);
+       avro_datum_decref(head);
+
+       return 0;
+}
+
 static int test_enum(void)
 {
        enum avro_languages {
@@ -604,6 +646,7 @@ int main(void)
                "boolean", test_boolean}, {
                "null", test_null}, {
                "record", test_record}, {
+               "nested_record", test_nested_record}, {
                "enum", test_enum}, {
                "array", test_array}, {
                "map", test_map}, {


Reply via email to