My bad, the link should be this: https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Enriched+Message+Metadata
On Fri, Nov 21, 2014 at 5:29 PM, Timothy Chen <tnac...@gmail.com> wrote: > Hi Guozhang, > > I don't think that is publically accessible, can you update it to the > Kafka wiki? > > Tim > > On Fri, Nov 21, 2014 at 5:24 PM, Guozhang Wang <wangg...@gmail.com> wrote: > > Hi all, > > > > I have updated the wiki page ( > > > https://iwww.corp.linkedin.com/wiki/cf/display/ENGS/Kafka+Enriched+Message+Metadata > ) > > according to people's comments and discussions offline. > > > > Guozhang > > > > On Thu, Nov 13, 2014 at 9:43 AM, Guozhang Wang <wangg...@gmail.com> > wrote: > > > >> Hi Jun, > >> > >> Sorry for the delay on your comments in the wiki page as well as this > >> thread; quite swamped now. I will get back to you as soon as I find some > >> time. > >> > >> Guozhang > >> > >> On Tue, Nov 11, 2014 at 6:26 PM, Jun Rao <jun...@gmail.com> wrote: > >> > >>> Thinking about this a bit more. For adding the auditing support, I am > not > >>> sure if we need to change the message format by adding the application > >>> tags. An alternative way to do that is to add it in the producer > client. > >>> For example, for each message payload (doesn't matter what the > >>> serialization mechanism is) that a producer receives, the producer can > >>> just > >>> add a header before the original payload. The header will contain all > >>> needed fields (e.g. timestamp, host, etc) for the purpose of auditing. > >>> This > >>> way, we don't need to change the message format and the auditing info > can > >>> be added independent of the serialization mechanism of the message. The > >>> header can use a different serialization mechanism for better > efficiency. > >>> For example, if we use Avro to serialize the header, the encoded bytes > >>> won't include the field names in the header. This is potentially more > >>> efficient than representing those fields as application tags in the > >>> message > >>> where the tags have to be explicitly store in every message. > >>> > >>> To make it easier for the client to add and make use of this kind of > >>> auditing support, I was imagining that we can add a ProducerFactory in > the > >>> new java client. The ProducerFactory will create an instance of > Producer > >>> based on a config property. By default, the current KafkaProducer will > be > >>> returned. However, a user can plug in a different implementation of > >>> Producer that does auditing. For example, an implementation of an > >>> AuditProducer.send() can take the original ProducerRecord, add the > header > >>> to the value byte array and then forward the record to an underlying > >>> KafkaProducer. We can add a similar ConsumerFactory to the new consumer > >>> client. If a user plugs in an implementation of the AuditingConsumer, > the > >>> consumer will then be audited automatically. > >>> > >>> Thanks, > >>> > >>> Jun > >>> > >>> On Tue, Oct 21, 2014 at 4:06 PM, Guozhang Wang <wangg...@gmail.com> > >>> wrote: > >>> > >>> > Hi Jun, > >>> > > >>> > Regarding 4) in your comment, after thinking it for a while I cannot > >>> come > >>> > up a way to it along with log compaction without adding new fields > into > >>> the > >>> > current format on message set. Do you have a better way that do not > >>> require > >>> > protocol changes? > >>> > > >>> > Guozhang > >>> > > >>> > On Mon, Oct 20, 2014 at 9:53 AM, Guozhang Wang <wangg...@gmail.com> > >>> wrote: > >>> > > >>> > > I have updated the wiki page incorporating received comments. We > can > >>> > > discuss some more details on: > >>> > > > >>> > > 1. How we want to do audit? Whether we want to have in-built > auditing > >>> on > >>> > > brokers or even MMs or use an audit consumer to fetch all messages > >>> from > >>> > > just brokers. > >>> > > > >>> > > 2. How we can avoid de-/re-compression on brokers and MMs with log > >>> > > compaction turned on. > >>> > > > >>> > > 3. How we can resolve unclean leader election resulted data > >>> inconsistency > >>> > > with control messages. > >>> > > > >>> > > Guozhang > >>> > > > >>> > > On Sun, Oct 19, 2014 at 11:41 PM, Guozhang Wang < > wangg...@gmail.com> > >>> > > wrote: > >>> > > > >>> > >> Thanks for the detailed comments Jun! Some replies inlined. > >>> > >> > >>> > >> On Sun, Oct 19, 2014 at 7:42 PM, Jun Rao <jun...@gmail.com> > wrote: > >>> > >> > >>> > >>> Hi, Guozhang, > >>> > >>> > >>> > >>> Thanks for the writeup. > >>> > >>> > >>> > >>> A few high level comments. > >>> > >>> > >>> > >>> 1. Associating (versioned) schemas to a topic can be a good thing > >>> > >>> overall. > >>> > >>> Yes, this could add a bit more management overhead in Kafka. > >>> However, > >>> > it > >>> > >>> makes sure that the data format contract between a producer and a > >>> > >>> consumer > >>> > >>> is kept and managed in a central place, instead of in the > >>> application. > >>> > >>> The > >>> > >>> latter is probably easier to start with, but is likely to be > >>> brittle in > >>> > >>> the > >>> > >>> long run. > >>> > >>> > >>> > >> > >>> > >> I am actually not proposing to not support associated versioned > >>> schemas > >>> > >> for topics, but to not let some core Kafka functionalities like > >>> auditing > >>> > >> being depend on schemas. I think this alone can separate the > schema > >>> > >> management from Kafka piping management (i.e. making sure every > >>> single > >>> > >> message is delivered, and within some latency, etc). Adding > >>> additional > >>> > >> auditing info into an existing schema will force Kafka to be > aware of > >>> > the > >>> > >> schema systems (Avro, JSON, etc). > >>> > >> > >>> > >> > >>> > >>> > >>> > >>> 2. Auditing can be a general feature that's useful for many > >>> > applications. > >>> > >>> Such a feature can be implemented by extending the low level > message > >>> > >>> format > >>> > >>> with a header. However, it can also be added as part of the > schema > >>> > >>> management. For example, you can imagine a type of audited schema > >>> that > >>> > >>> adds > >>> > >>> additional auditing info to an existing schema automatically. > >>> > Performance > >>> > >>> wise, it probably doesn't make a big difference whether the > auditing > >>> > info > >>> > >>> is added in the message header or the schema header. > >>> > >>> > >>> > >>> > >>> > >> See replies above. > >>> > >> > >>> > >> > >>> > >>> 3. We talked about avoiding the overhead of decompressing in both > >>> the > >>> > >>> broker and the mirror maker. We probably need to think through > how > >>> this > >>> > >>> works with auditing. In the more general case where you want to > >>> audit > >>> > >>> every > >>> > >>> message, one has to do the decompression to get the individual > >>> message, > >>> > >>> independent of how the auditing info is stored. This means that > if > >>> we > >>> > >>> want > >>> > >>> to audit the broker directly or the consumer in mirror maker, we > >>> have > >>> > to > >>> > >>> pay the decompression cost anyway. Similarly, if we want to > extend > >>> > mirror > >>> > >>> maker to support some customized filtering/transformation logic, > we > >>> > also > >>> > >>> have to pay the decompression cost. > >>> > >>> > >>> > >>> > >>> > >> I see your point. For that I would prefer to have a MM > implementation > >>> > >> that is able to do de-compress / re-compress ONLY if required, for > >>> > example > >>> > >> by auditing, etc. I agree that we have not thought through > whether we > >>> > >> should enable auditing on MM, and if yes how to do that, and we > could > >>> > >> discuss about that in a different thread. Overall, this proposal > is > >>> not > >>> > >> just for tackling de-compression on MM but about the feasibility > of > >>> > >> extending Kafka message header for system properties / app > >>> properties. > >>> > >> > >>> > >> > >>> > >>> Some low level comments. > >>> > >>> > >>> > >>> 4. Broker offset reassignment (kafka-527): This probably can be > >>> done > >>> > >>> with > >>> > >>> just a format change on the compressed message set. > >>> > >>> > >>> > >>> That is true. As I mentioned in the wiki each of the problems > may be > >>> > >> resolvable separately but I am thinking about a general way to get > >>> all > >>> > of > >>> > >> them. > >>> > >> > >>> > >> > >>> > >>> 5. MirrorMaker refactoring: We probably can think through how > >>> general > >>> > we > >>> > >>> want mirror maker to be. If we want to it to be more general, we > >>> likely > >>> > >>> need to decompress every message just like in a normal consumer. > >>> There > >>> > >>> will > >>> > >>> definitely be overhead. However, as long as mirror maker is made > >>> > >>> scalable, > >>> > >>> we can overcome the overhead by just running more instances on > more > >>> > >>> hardware resources. As for the proposed message format change, we > >>> > >>> probably > >>> > >>> need to think through it a bit more. The honor-ship flag seems a > bit > >>> > >>> hacky > >>> > >>> to me. > >>> > >>> > >>> > >>> > >>> > >> Replied as part of 3). Sure we can discuss more about that, will > >>> update > >>> > >> the wiki for collected comments. > >>> > >> > >>> > >> > >>> > >>> 6. Adding a timestamp in each message can be a useful thing. It > (1) > >>> > >>> allows > >>> > >>> log segments to be rolled more accurately; (2) allows finding an > >>> offset > >>> > >>> for > >>> > >>> a particular timestamp more accurately. I am thinking that the > >>> > timestamp > >>> > >>> in > >>> > >>> the message should probably be the time when the leader receives > the > >>> > >>> message. Followers preserve the timestamp set by leader. To avoid > >>> time > >>> > >>> going back during leader change, the leader can probably set the > >>> > >>> timestamp > >>> > >>> to be the max of current time and the timestamp of the last > >>> message, > >>> > if > >>> > >>> present. That timestamp can potentially be added to the index > file > >>> to > >>> > >>> answer offsetBeforeTimestamp queries more efficiently. > >>> > >>> > >>> > >>> > >>> > >> Agreed. > >>> > >> > >>> > >> > >>> > >>> 7. Log compaction: It seems that you are suggesting an > improvement > >>> to > >>> > >>> compact the active segment as well. This can be tricky and we > need > >>> to > >>> > >>> figure out the details on how to do this. This improvement seems > to > >>> be > >>> > >>> orthogonal to the message format change though. > >>> > >>> > >>> > >>> > >>> > >> I think the improvements is more effective with the timestamps as > in > >>> 6), > >>> > >> we can discuss more about this. > >>> > >> > >>> > >> > >>> > >>> 8. Data inconsistency from unclean election: I am not sure if we > >>> need > >>> > to > >>> > >>> add a controlled message to the log during leadership change. The > >>> > <leader > >>> > >>> generation, starting offset> map can be maintained in a separate > >>> > >>> checkpoint > >>> > >>> file. The follower just need to get that map from the leader > during > >>> > >>> startup. > >>> > >>> > >>> > >>> What I was proposing is an alternative solution given that we > have > >>> this > >>> > >> message header enhancement; with this we do not need to add > another > >>> > logic > >>> > >> for leadership map and checkpoint file, but just the logic on > >>> > >> replica-manager to handle this extra controlled message and > >>> remembering > >>> > the > >>> > >> current leader epoch instead of a map. > >>> > >> > >>> > >> > >>> > >>> Thanks, > >>> > >>> > >>> > >>> Jun > >>> > >>> > >>> > >>> On Fri, Oct 10, 2014 at 5:33 PM, Guozhang Wang < > wangg...@gmail.com> > >>> > >>> wrote: > >>> > >>> > >>> > >>> > Hello all, > >>> > >>> > > >>> > >>> > I put some thoughts on enhancing our current message metadata > >>> format > >>> > to > >>> > >>> > solve a bunch of existing issues: > >>> > >>> > > >>> > >>> > > >>> > >>> > > >>> > >>> > >>> > > >>> > https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Enriched+Message+Metadata > >>> > >>> > > >>> > >>> > This wiki page is for kicking off some discussions about the > >>> > >>> feasibility of > >>> > >>> > adding more info into the message header, and if possible how > we > >>> > would > >>> > >>> add > >>> > >>> > them. > >>> > >>> > > >>> > >>> > -- Guozhang > >>> > >>> > > >>> > >>> > >>> > >> > >>> > >> > >>> > >> > >>> > >> -- > >>> > >> -- Guozhang > >>> > >> > >>> > > > >>> > > > >>> > > > >>> > > -- > >>> > > -- Guozhang > >>> > > > >>> > > >>> > > >>> > > >>> > -- > >>> > -- Guozhang > >>> > > >>> > >> > >> > >> > >> -- > >> -- Guozhang > >> > > > > > > > > -- > > -- Guozhang > -- -- Guozhang