
Jay Kreps resolved KAFKA-643.

    Resolution: Fixed

> Refactor api definition layer
> -----------------------------
>                 Key: KAFKA-643
>                 URL: https://issues.apache.org/jira/browse/KAFKA-643
>             Project: Kafka
>          Issue Type: Improvement
>    Affects Versions: 0.8.1
>            Reporter: Jay Kreps
>            Assignee: Jay Kreps
> The way we are defining our protocol is really a bit embarrassing. It is full 
> of ad hoc serialization code for each API. This code is very fiddly and 
> opaque and when it has errors they are hard to debug. Since it is all done 
> one-off it is also very easy for it to become inconsistent. This was 
> tolerable when there were only two apis with a few fields each, but now there 
> are a half dozen more complex apis. By my count there is now over 1000 lines 
> of code in kafka.apis.*.
> One option would be to use protocol buffers or thrift or another 
> schema-oriented code gen RPC language. However I think this is probably the 
> wrong direction for a couple reasons. One is that we want something that 
> works well with our I/O model, both network and disk, which is very 
> NIO-centric. So it should work directly with ByteBuffers. Second I feel that 
> these systems complicate the specification of the protocol. They give a 
> schema, which is a great high-level description, but the translation of that 
> to bytes is essentially whatever their code-gen engine chooses to do. These 
> things are a great way to build application services, but not great for 
> something like what we are building.
> Instead I think we should do what we have done, specify the protocol as a 
> wiki. However we should write a little helper code to make our lives easier.
> Here is my recommendation for how this code would work. We add two helper 
> classes: Schema and Record.
> You define messages formats like this:
> import Types._
> val FetchRequestProtocol = 
>   Schema("ReplicaId"->int32, 
>                "MaxWaitTime"->int32, 
>                "MinBytes"->int32,
>                Seq("TopicName"->utf8,
>                       Seq("Partition"->int32, 
>                              "FetchOffset"->int64, 
>                              "MaxBytes"->int32)))
> Note that this almost exactly matches the BNF for the fetch request: 
> https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol
> Once defined this schema can be used to parse messages:
>   val record: Record = FetchRequestProtocol.readFrom(buffer)
> A record is just a wrapper around an array. The readFrom method parses out 
> the fields specified in the schema and populates the array. Fields in the 
> record can be accessed by name, e.g. 
>   record("ReplicaId")
> For common access this is probably good enough. However since the position is 
> fixed, it is also possible to get the element by a Field object, which gets 
> rid of the hashmap lookup and goes directly to the right slot. E.g.
>   val ReplicaIdField = FetchRequestProtocol("ReplicaId") // do this as a 
> global variable
>   ...
>   record(ReplicaIdField)
> This will be for cases where we are a bit performance conscious and don't 
> want to do umpteen hashmap lookups to resolve string field names.
> Likewise the other direction, to write out a record:
>   record.writeTo(buffer)
> and to get the size in bytes:
>   record.size
> Implementing a single read, write, and size method with generic schemas will 
> not only make the underlying protocol clearly defined but also ensure good 
> error handling, error reporting, etc. It will be a bit slower, maybe not much 
> because we can optimize this code.
> I do realize that this is essentially what Avro or Thrift or ProtocolBuffers 
> do, but I think this is much simpler, and can be implemented in a few hundred 
> lines of code with no dependencies. Furthermore it is a way to implement our 
> protocol, not a way to define a protocol.
> In terms of how we use this, this is what I have in mind:
> I think we should split the apis into a generic and a specific portion. With 
> the generic piece being the header shared by all requests and responses, and 
> the specific portion being the bits for that message. I recommend we 
> officially implement versioning by allowing multiple versions of the schemas 
> and always looking up the right schema for the incoming and outgoing 
> messages. I think we can keep the existing case classes, and just map the 
> scala objects to and from the record instances in a wrapper layer prior to 
> the existing KafkaApis. The KafkaApis.handle method would disappear and 
> instead this wrapper would handle message deserialization and calling the 
> right method with the right request object.

This message was sent by Atlassian JIRA

Reply via email to