SG,

Now that logical types are in, I had some time to look at this issue. Thanks for your patience on this.

When I started looking at the use case, this began to look very much like a logical type issue. (I know, I've been saying that a lot.) When you write, you replace any referenced object with its id. When you read, you replace those ids with the correct object. I went ahead and implemented this using 2 logical types: Referenceable for an object with an id, and Reference for a record with a field that references another object by id.

There were some trade-offs to this approach. For example, I had to use a logical type for the object containing the reference rather than for the reference itself because I used a callback to set the object once it is found. That happens because children with references to a parent are deserialized completely first. The parent is the last object to be assembled and passed to the logical type conversion.

A bigger issue was that logical types are currently conservative and don't overlap with reflect types or anything that sets java-class. That means that this currently only works with generic types. But, I'd rather make logical types work for reflect than add more custom code to support this. Does that sound reasonable?

I'm attaching a diff with the working test code so you can take a look at the approach. Let me know what you are thinking.

rb

On 05/20/2015 12:05 PM, S G wrote:
I am requesting some help with AVRO-695.
Here are some pieces from the last conversation.


Doug Cutting
<https://issues.apache.org/jira/secure/ViewProfile.jspa?name=cutting> added
a comment - 02/Oct/14 21:19

Here's a modified version of the patch. It moves all significant changes to
reflect. Since reflect is the only implementation that can generate an
appropriate schema, changes should be confined to there.

The tests need to be updated, as they still assume generic.
<https://issues.apache.org/jira/browse/AVRO-695#>
<https://issues.apache.org/jira/browse/AVRO-695?focusedCommentId=14286370&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-14286370>
Sachin Goyal
<https://issues.apache.org/jira/secure/ViewProfile.jspa?name=sachingoyal> added
a comment - 21/Jan/15 21:56

Here is a patch with the updated test-cases.
I also confirm that all my changes are there in the patch.


The patch was submitted in June 2014 and was very hot till October 2014.
Since then, there has been no action on this even though I have sent many
reminders in this group.

I understand that everyone is very busy with their own stuff but I would
really appreciate if someone could help a fellow engineer in getting his
patch accepted.
It would encourage more participation as well as help people wanting to use
Avro for circular references.

Regards
SG


--
Ryan Blue
Software Engineer
Cloudera, Inc.
diff --git a/lang/java/avro/src/main/java/org/apache/avro/generic/GenericDatumWriter.java b/lang/java/avro/src/main/java/org/apache/avro/generic/GenericDatumWriter.java
index 903dfbf..2523ae9 100644
--- a/lang/java/avro/src/main/java/org/apache/avro/generic/GenericDatumWriter.java
+++ b/lang/java/avro/src/main/java/org/apache/avro/generic/GenericDatumWriter.java
@@ -66,9 +66,11 @@ public class GenericDatumWriter<D> implements DatumWriter<D> {
     LogicalType logicalType = schema.getLogicalType();
     if (datum != null && logicalType != null) {
       Conversion<?> conversion = getData()
-          .getConversionFrom(datum.getClass(), logicalType);
-      writeWithoutConversion(schema,
-          convert(schema, logicalType, conversion, datum), out);
+          .getConversionFor(logicalType);
+      if (conversion.getConvertedType().isAssignableFrom(datum.getClass())) {
+        writeWithoutConversion(schema,
+            convert(schema, logicalType, conversion, datum), out);
+      }
     } else {
       writeWithoutConversion(schema, datum, out);
     }
diff --git a/lang/java/avro/src/test/java/org/apache/avro/TestCircularReferences.java b/lang/java/avro/src/test/java/org/apache/avro/TestCircularReferences.java
new file mode 100644
index 0000000..3f2b9c5
--- /dev/null
+++ b/lang/java/avro/src/test/java/org/apache/avro/TestCircularReferences.java
@@ -0,0 +1,384 @@
+package org.apache.avro;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.*;
+import org.apache.avro.file.DataFileReader;
+import org.apache.avro.file.DataFileWriter;
+import org.apache.avro.file.FileReader;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericData.Record;
+import org.apache.avro.generic.IndexedRecord;
+import org.apache.avro.io.DatumReader;
+import org.apache.avro.io.DatumWriter;
+import org.apache.avro.util.Utf8;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+public class TestCircularReferences {
+
+  @Rule
+  public TemporaryFolder temp = new TemporaryFolder();
+
+  public static class Reference extends LogicalType {
+    private static final String REFERENCE = "reference";
+    private static final String REF_FIELD_NAME = "ref-field-name";
+
+    private final String refFieldName;
+
+    public Reference(String refFieldName) {
+      super(REFERENCE);
+      this.refFieldName = refFieldName;
+    }
+
+    public Reference(Schema schema) {
+      super(REFERENCE);
+      this.refFieldName = schema.getProp(REF_FIELD_NAME);
+    }
+
+    @Override
+    public Schema addToSchema(Schema schema) {
+      super.addToSchema(schema);
+      schema.addProp(REF_FIELD_NAME, refFieldName);
+      return schema;
+    }
+
+    @Override
+    public String getName() {
+      return REFERENCE;
+    }
+
+    public String getRefFieldName() {
+      return refFieldName;
+    }
+
+    @Override
+    public void validate(Schema schema) {
+      super.validate(schema);
+      if (schema.getField(refFieldName) == null) {
+        throw new IllegalArgumentException("Invalid field name for reference field: " + refFieldName);
+      }
+    }
+  }
+
+  public static class Referenceable extends LogicalType {
+    private static final String REFERENCEABLE = "referenceable";
+    private static final String ID_FIELD_NAME = "id-field-name";
+
+    private final String idFieldName;
+
+    public Referenceable(String idFieldName) {
+      super(REFERENCEABLE);
+      this.idFieldName = idFieldName;
+    }
+
+    public Referenceable(Schema schema) {
+      super(REFERENCEABLE);
+      this.idFieldName = schema.getProp(ID_FIELD_NAME);
+    }
+
+    @Override
+    public Schema addToSchema(Schema schema) {
+      super.addToSchema(schema);
+      schema.addProp(ID_FIELD_NAME, idFieldName);
+      return schema;
+    }
+
+    @Override
+    public String getName() {
+      return REFERENCEABLE;
+    }
+
+    public String getIdFieldName() {
+      return idFieldName;
+    }
+
+    @Override
+    public void validate(Schema schema) {
+      super.validate(schema);
+      Schema.Field idField = schema.getField(idFieldName);
+      if (idField == null || idField.schema().getType() != Schema.Type.LONG) {
+        throw new IllegalArgumentException("Invalid ID field: " + idFieldName + ": " + idField);
+      }
+    }
+  }
+
+  @BeforeClass
+  public static void addReferenceTypes() {
+    LogicalTypes.register(Referenceable.REFERENCEABLE, new LogicalTypes.LogicalTypeFactory() {
+      @Override
+      public LogicalType fromSchema(Schema schema) {
+        return new Referenceable(schema);
+      }
+    });
+    LogicalTypes.register(Reference.REFERENCE, new LogicalTypes.LogicalTypeFactory() {
+      @Override
+      public LogicalType fromSchema(Schema schema) {
+        return new Reference(schema);
+      }
+    });
+  }
+
+  public static class ReferenceManager {
+    private interface Callback {
+      void set(Object referenceable);
+    }
+
+    private final Map<Long, Object> references = new HashMap<Long, Object>();
+    private final Map<Object, Long> ids = new IdentityHashMap<Object, Long>();
+    private final Map<Long, List<Callback>> callbacksById = new HashMap<Long, List<Callback>>();
+    private final ReferenceableTracker tracker = new ReferenceableTracker();
+    private final ReferenceHandler handler = new ReferenceHandler();
+
+    public ReferenceableTracker getTracker() {
+      return tracker;
+    }
+
+    public ReferenceHandler getHandler() {
+      return handler;
+    }
+
+    public class ReferenceableTracker extends Conversion<IndexedRecord> {
+      @Override
+      public Class<IndexedRecord> getConvertedType() {
+        return IndexedRecord.class;
+      }
+
+      @Override
+      public String getLogicalTypeName() {
+        return Referenceable.REFERENCEABLE;
+      }
+
+      @Override
+      public IndexedRecord fromRecord(IndexedRecord value, Schema schema, LogicalType type) {
+        // read side
+        long id = getId(value, schema);
+
+        // keep track of this for later references
+        references.put(id, value);
+
+        // call any callbacks waiting to resolve this id
+        List<Callback> callbacks = callbacksById.get(id);
+        for (Callback callback : callbacks) {
+          callback.set(value);
+        }
+        callbacks.clear();
+
+        return value;
+      }
+
+      @Override
+      public IndexedRecord toRecord(IndexedRecord value, Schema schema, LogicalType type) {
+        // write side
+        long id = getId(value, schema);
+
+        // keep track of this for later references
+        //references.put(id, value);
+        ids.put(value, id);
+
+        return value;
+      }
+
+      private long getId(IndexedRecord referenceable, Schema schema) {
+        Referenceable info = (Referenceable) schema.getLogicalType();
+        int idField = schema.getField(info.getIdFieldName()).pos();
+        return (Long) referenceable.get(idField);
+      }
+    }
+
+    public class ReferenceHandler extends Conversion<IndexedRecord> {
+      @Override
+      public Class<IndexedRecord> getConvertedType() {
+        return IndexedRecord.class;
+      }
+
+      @Override
+      public String getLogicalTypeName() {
+        return Reference.REFERENCE;
+      }
+
+      @Override
+      public IndexedRecord fromRecord(final IndexedRecord record, Schema schema, LogicalType type) {
+        // read side: resolve the record or save a callback
+        final Schema.Field refField = schema.getField(((Reference) type).getRefFieldName());
+
+        Long id = (Long) record.get(refField.pos());
+        if (id != null) {
+          if (references.containsKey(id)) {
+            record.put(refField.pos(), references.get(id));
+
+          } else {
+            List<Callback> callbacks = callbacksById.get(id);
+            if (callbacks == null) {
+              callbacks = new ArrayList<Callback>();
+              callbacksById.put(id, callbacks);
+            }
+            // add a callback to resolve this reference when the id is available
+            callbacks.add(new Callback() {
+              @Override
+              public void set(Object referenceable) {
+                record.put(refField.pos(), referenceable);
+              }
+            });
+          }
+        }
+
+        return record;
+      }
+
+      @Override
+      public IndexedRecord toRecord(IndexedRecord record, Schema schema, LogicalType type) {
+        // write side: replace a referenced field with its id
+        Schema.Field refField = schema.getField(((Reference) type).getRefFieldName());
+        IndexedRecord referenced = (IndexedRecord) record.get(refField.pos());
+        if (referenced == null) {
+          return record;
+        }
+
+        // hijack the field to return the id instead of the ref
+        return new HijackingIndexedRecord(record, refField.pos(), ids.get(referenced));
+      }
+    }
+
+    private static class HijackingIndexedRecord implements IndexedRecord {
+      private final IndexedRecord wrapped;
+      private final int index;
+      private final Object data;
+
+      public HijackingIndexedRecord(IndexedRecord wrapped, int index, Object data) {
+        this.wrapped = wrapped;
+        this.index = index;
+        this.data = data;
+      }
+
+      @Override
+      public void put(int i, Object v) {
+        throw new RuntimeException("[BUG] This is a read-only class.");
+      }
+
+      @Override
+      public Object get(int i) {
+        if (i == index) {
+          return data;
+        }
+        return wrapped.get(i);
+      }
+
+      @Override
+      public Schema getSchema() {
+        return wrapped.getSchema();
+      }
+    }
+  }
+
+  @Test
+  public void test() throws IOException {
+    ReferenceManager manager = new ReferenceManager();
+    GenericData model = new GenericData();
+    model.addLogicalTypeConversion(manager.getTracker());
+    model.addLogicalTypeConversion(manager.getHandler());
+
+    Schema parentSchema = Schema.createRecord("Parent", null, null, false);
+
+    Schema parentRefSchema = Schema.createUnion(
+        Schema.create(Schema.Type.NULL),
+        Schema.create(Schema.Type.LONG),
+        parentSchema);
+    Reference parentRef = new Reference("parent");
+
+    List<Schema.Field> childFields = new ArrayList<Schema.Field>();
+    childFields.add(new Schema.Field("c", Schema.create(Schema.Type.STRING), null, null));
+    childFields.add(new Schema.Field("parent", parentRefSchema, null, null));
+    Schema childSchema = parentRef.addToSchema(
+        Schema.createRecord("Child", null, null, false, childFields));
+
+    List<Schema.Field> parentFields = new ArrayList<Schema.Field>();
+    parentFields.add(new Schema.Field("id", Schema.create(Schema.Type.LONG), null, null));
+    parentFields.add(new Schema.Field("p", Schema.create(Schema.Type.STRING), null, null));
+    parentFields.add(new Schema.Field("child", childSchema, null, null));
+    parentSchema.setFields(parentFields);
+    Referenceable idRef = new Referenceable("id");
+
+    Schema schema = idRef.addToSchema(parentSchema);
+
+    Record parent = new Record(schema);
+    parent.put("id", 1L);
+    parent.put("p", "parent data!");
+
+    Record child = new Record(childSchema);
+    child.put("c", "child data!");
+    child.put("parent", parent);
+
+    parent.put("child", child);
+
+    // serialization round trip
+    File data = write(model, schema, parent);
+    List<Record> records = read(model, schema, data);
+
+    Record actual = records.get(0);
+
+    // because the record is a recursive structure, equals won't work
+    Assert.assertEquals("Should correctly read back the parent id",
+        1L, actual.get("id"));
+    Assert.assertEquals("Should correctly read back the parent data",
+        new Utf8("parent data!"), actual.get("p"));
+
+    Record actualChild = (Record) actual.get("child");
+    Assert.assertEquals("Should correctly read back the child data",
+        new Utf8("child data!"), actualChild.get("c"));
+    Object childParent = actualChild.get("parent");
+    Assert.assertTrue("Should have a parent Record object",
+        childParent instanceof Record);
+
+    Record childParentRecord = (Record) actualChild.get("parent");
+    Assert.assertEquals("Should have the right parent id",
+        1L, childParentRecord.get("id"));
+    Assert.assertEquals("Should have the right parent data",
+        new Utf8("parent data!"), childParentRecord.get("p"));
+  }
+
+  private <D> List<D> read(GenericData model, Schema schema, File file) throws IOException {
+    DatumReader<D> reader = newReader(model, schema);
+    List<D> data = new ArrayList<D>();
+    FileReader<D> fileReader = null;
+
+    try {
+      fileReader = new DataFileReader<D>(file, reader);
+      for (D datum : fileReader) {
+        data.add(datum);
+      }
+    } finally {
+      if (fileReader != null) {
+        fileReader.close();
+      }
+    }
+
+    return data;
+  }
+
+  @SuppressWarnings("unchecked")
+  private <D> DatumReader<D> newReader(GenericData model, Schema schema) {
+    return model.createDatumReader(schema);
+  }
+
+  @SuppressWarnings("unchecked")
+  private <D> File write(GenericData model, Schema schema, D... data) throws IOException {
+    File file = temp.newFile();
+    DatumWriter<D> writer = model.createDatumWriter(schema);
+    DataFileWriter<D> fileWriter = new DataFileWriter<D>(writer);
+
+    try {
+      fileWriter.create(schema, file);
+      for (D datum : data) {
+        fileWriter.append(datum);
+      }
+    } finally {
+      fileWriter.close();
+    }
+
+    return file;
+  }
+}

Reply via email to