[ https://issues.apache.org/jira/browse/KAFKA-643?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Jay Kreps resolved KAFKA-643. ----------------------------- Resolution: Fixed > Refactor api definition layer > ----------------------------- > > Key: KAFKA-643 > URL: https://issues.apache.org/jira/browse/KAFKA-643 > Project: Kafka > Issue Type: Improvement > Affects Versions: 0.8.1 > Reporter: Jay Kreps > Assignee: Jay Kreps > > The way we are defining our protocol is really a bit embarrassing. It is full > of ad hoc serialization code for each API. This code is very fiddly and > opaque and when it has errors they are hard to debug. Since it is all done > one-off it is also very easy for it to become inconsistent. This was > tolerable when there were only two apis with a few fields each, but now there > are a half dozen more complex apis. By my count there is now over 1000 lines > of code in kafka.apis.*. > One option would be to use protocol buffers or thrift or another > schema-oriented code gen RPC language. However I think this is probably the > wrong direction for a couple reasons. One is that we want something that > works well with our I/O model, both network and disk, which is very > NIO-centric. So it should work directly with ByteBuffers. Second I feel that > these systems complicate the specification of the protocol. They give a > schema, which is a great high-level description, but the translation of that > to bytes is essentially whatever their code-gen engine chooses to do. These > things are a great way to build application services, but not great for > something like what we are building. > Instead I think we should do what we have done, specify the protocol as a > wiki. However we should write a little helper code to make our lives easier. > Here is my recommendation for how this code would work. We add two helper > classes: Schema and Record. > You define messages formats like this: > import Types._ > val FetchRequestProtocol = > Schema("ReplicaId"->int32, > "MaxWaitTime"->int32, > "MinBytes"->int32, > Seq("TopicName"->utf8, > Seq("Partition"->int32, > "FetchOffset"->int64, > "MaxBytes"->int32))) > Note that this almost exactly matches the BNF for the fetch request: > > https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol > Once defined this schema can be used to parse messages: > val record: Record = FetchRequestProtocol.readFrom(buffer) > A record is just a wrapper around an array. The readFrom method parses out > the fields specified in the schema and populates the array. Fields in the > record can be accessed by name, e.g. > record("ReplicaId") > For common access this is probably good enough. However since the position is > fixed, it is also possible to get the element by a Field object, which gets > rid of the hashmap lookup and goes directly to the right slot. E.g. > val ReplicaIdField = FetchRequestProtocol("ReplicaId") // do this as a > global variable > ... > record(ReplicaIdField) > This will be for cases where we are a bit performance conscious and don't > want to do umpteen hashmap lookups to resolve string field names. > Likewise the other direction, to write out a record: > record.writeTo(buffer) > and to get the size in bytes: > record.size > Implementing a single read, write, and size method with generic schemas will > not only make the underlying protocol clearly defined but also ensure good > error handling, error reporting, etc. It will be a bit slower, maybe not much > because we can optimize this code. > I do realize that this is essentially what Avro or Thrift or ProtocolBuffers > do, but I think this is much simpler, and can be implemented in a few hundred > lines of code with no dependencies. Furthermore it is a way to implement our > protocol, not a way to define a protocol. > In terms of how we use this, this is what I have in mind: > I think we should split the apis into a generic and a specific portion. With > the generic piece being the header shared by all requests and responses, and > the specific portion being the bits for that message. I recommend we > officially implement versioning by allowing multiple versions of the schemas > and always looking up the right schema for the incoming and outgoing > messages. I think we can keep the existing case classes, and just map the > scala objects to and from the record instances in a wrapper layer prior to > the existing KafkaApis. The KafkaApis.handle method would disappear and > instead this wrapper would handle message deserialization and calling the > right method with the right request object. -- This message was sent by Atlassian JIRA (v6.1.5#6160)