[
https://issues.apache.org/jira/browse/KAFKA-2367?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14710216#comment-14710216
]
Ewen Cheslack-Postava commented on KAFKA-2367:
----------------------------------------------
I posted an initial patch. It covers roughly the functionality of Avro's APIs.
The actual data API code (not tests) clocks in at < 600 lines, substantially
smaller than the code from Avro (which didn't even have GenericRecord support
included...) Overall the code (including tests) shrunk by 1200 lines and now
has more functionality. The API broader than Avro in some ways (more primitive
types to cover types available in other serialization formats, supports typed
keys in maps, etc), in other ways it's omitted some things currently (enums,
unions).
Some notes and thoughts:
* I have not defined any logical types yet, but they can be implemented for any
type and identified by the name + version fields. I think most of the effort
here is in the specification. The implementation is just providing the schema
definitions, and occasionally including conversion utilities for built-in java
types that correspond to the type and are likely to be used commonly.
* The version field is a bit weird - it is a byte[]. This lets you, e.g., use a
network byte order integer directly if you want to. I considered just requiring
a string instead. The overhead of requiring encoding to a string seems pretty
minimal for versioning schemes I can think of (incrementing integers, hashes),
but the byte[] seemed like the right way to handle it. One thing the
documentation for version numbers doesn't currently define is how ordering
works. I'm not sure versions are going to be very useful without this (besides
uniquely identifying the schema, it would be nice to know whether a schema is
newer or older than another one), but I'm not sure everyone will have a
versioning scheme that allows for comparing versions without contacting an
external service.
* I provided support for byte[] and ByteBuffer. byte[] is really annoying since
equals/hashCode don't work for it; the situation is made worse because it can
be nested, arbitrarily deeply, in lists and maps. I haven't tried to address
the equals/hashCode problems in those cases. I think its better to recommend
using ByteBuffer when you need to be able to test for equality/compute
hashcodes. However, this also needs to be respected by deserializers/converters.
* Made schemas explicit in the Copycat APIs. Previously schemas were inferred
for primitive types. I think making this explicit might be a bit more effort
for connector developers, but it's ultimately the right thing to do. There are
too many weird edge cases you can get into without explicitly specifying the
schema, especially with list/map types (if it's empty, what are the key/value
schemas?) and optional values (if you encounter a null, what's the schema?).
* Caching of conversion of schemas. In the JSON implementation we're including,
we're probably being pretty wasteful right now since every record has to
translate both the schema and data to JSON. We should definitely be doing some
caching here. I think an LRU using an IdentityHashMap should be fine. However,
this does assume that connectors are good about reusing schemas (defining them
up front, or if they are dynamic they should have their own cache of schemas
and be able to detect when they can be reused).
* Many serialization formats don't support unions natively. They require a
separate struct that contains optional fields for all the types (and might
provide syntactic sugar, but it has no impact on the actual encoding). When you
make nullability part of every type, I think the need for unions pretty much
disappears -- creating the appropriate struct is trivial.
* I didn't include an IndexedRecord-like interface. I did allow setting a field
by passing in the Field object instead of the field name, which avoids having
to do a hash lookup of the fieldname, which has basically the same result as
setting via the index. I didn't want to expose something like put(int
fieldNumber, Object value) because using an array internally in Struct is
really an implementation detail to avoid object allocations.
* The fluent API for SchemaBuilder is different from how avro works. It prefers
a single class for building schemas, doesn't directly handle nesting (you can
just run another schemabuilder inline in the calls to the parent). This
actually is a substantial difference -- in Copycat the expectation is that most
schemas are created dynamically (most connectors do *not* have fixed schemas to
work with), whereas Avro's seems more targeted towards more static use cases
(not surprisingly). Avro's more complex implementation can catch certain types
of errors at compile time (there are lots of different *Builder classes with
various abilities/restrictions), but for Copycat this doesn't help much and
just makes the code more verbose. See, e.g.,
https://github.com/confluentinc/copycat/blob/master/avro/src/main/java/io/confluent/copycat/avro/AvroData.java
where a lot of the code paths are basically the same and the code could be a
lot more concise except when I tried to do so, I ended up unable to get the
different types from Avro's SchemaBuilder classes to work out properly. Since a
lot of schema building in copycat is performed incrementally and can't *always*
be chained nicely without a lot of code duplication, it's nice to have just a
single type and gives you more flexibility in how you structure your code. The
cost is that there are some runtime exceptions you might need to be careful to
catch.
* There is only a single API for Struct and struct building, i.e. Structs are
mutable and there is no separate StructBuilder. This keeps the API simpler,
more concise, and there are clear points in the Copycat runtime where we can
call validate() on the data if we want to. This also means that Structs are
"fluent", i.e. return this consistently for chaining. This may seem an odd
choice given I did create separate Schema and SchemaBuilder classes, but I
think that makes sense since you want schemas to be immutable once you've
created them (which we don't quite enforce fully due to returning a mutable
list of fields, but it's pretty close).
* The combination of optional/nullable and default is a bit tricky and I think
the semantics vary across the different serializers. What do you do when you
have an optional field and a default? If you use null (or not serialized) to
indicate the value hasn't been set, then you can't *explicitly* set the field
to null. If you do something more complicated, you have to track more
state/create more objects when setting field values (something like
Option<Object> rather than just Object). The result of this is that if you
actually want to be able to encode nulls, you cannot provide a non-null default
value. In other words, the way to think of the default value is as a *getter*
default value, not a *putter* default value: you only use the default value if
a get() call would return null. I think I've gotten this right, but it's
something to watch out for when reviewing.
> Add Copycat runtime data API
> ----------------------------
>
> Key: KAFKA-2367
> URL: https://issues.apache.org/jira/browse/KAFKA-2367
> Project: Kafka
> Issue Type: Sub-task
> Components: copycat
> Reporter: Ewen Cheslack-Postava
> Assignee: Ewen Cheslack-Postava
> Fix For: 0.8.3
>
>
> Design the API used for runtime data in Copycat. This API is used to
> construct schemas and records that Copycat processes. This needs to be a
> fairly general data model (think Avro, JSON, Protobufs, Thrift) in order to
> support complex, varied data types that may be input from/output to many data
> systems.
> This should issue should also address the serialization interfaces used
> within Copycat, which translate the runtime data into serialized byte[] form.
> It is important that these be considered together because the data format can
> be used in multiple ways (records, partition IDs, partition offsets), so it
> and the corresponding serializers must be sufficient for all these use cases.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)