Hey Ismael, Yes, you are correct, I remember now why didn't do that. I rescind that suggestion. I still think lazy initialization is more in keeping with what we've done if feasible.
-Jay On Mon, Dec 12, 2016 at 11:36 PM, Ismael Juma <ism...@juma.me.uk> wrote: > Hi Jay, > > I like the idea of having a single `init`, but I am not sure about the > specifics of the metadata initialisation (as Jason alluded to). More > inline. > > On Tue, Dec 13, 2016 at 6:01 AM, Jay Kreps <j...@confluent.io> wrote: > > > 1. Add a generic init() call which initializes both transactions and > > metadata > > > > Would this initialise metadata for all topics? One advantage of doing the > metadata call during `send` is that we only retrieve metadata for the > subset of topics that you are producing to. For large clusters, retrieving > the metadata for all the topics is relatively expensive and I think users > would prefer to avoid that unless there are some concrete benefits. We > could pass the topics to `init`, but that seems a bit clunky. > > > > 2. If you don't call init(), metadata is initialized on the first send > > (as now) > > > We need to maintain the logic to refresh the metadata on `send` anyway if > you try to send to a topic that is missing from the metadata (e.g. if it's > added after the `init` method is called, assuming that we don't expect > people to call `init` more than once) so that seems fine. > > > > and transactions are lazily initialized at the first beginTransaction() > > call. > > > I'll leave it to Jason to say if this is feasible. However, if it is, it > seems like we can just do this and avoid the `init` method altogether? > > Ismael > > On Tue, Dec 13, 2016 at 6:01 AM, Jay Kreps <j...@confluent.io> wrote: > > > Hey Jason/Neha, > > > > Yeah, clearly having a mandatory, generic init() method that initializes > > both transactions and topic metadata would be the ideal solution. This > > would solve the occasional complaint about blocking behavior during > > initialization of metadata (or at least shift it to a new complaint about > > an inability to initialize when the cluster isn't up in test > environments). > > The challenge is that we can't do this because it isn't backwards > > compatible with existing apps that don't call init. > > > > The alternative of having an optional generic init() call is a bit odd > > because to figure out if you need to call it you need to discover what it > > does, which is not generic, it initializes transactions. We can't really > > add more logic to init because it only gets invoked by transaction users > so > > it doesn't really function as a generic init. > > > > What do you think of this solution: > > > > 1. Add a generic init() call which initializes both transactions and > > metadata > > 2. If you don't call init(), metadata is initialized on the first send > > (as now) and transactions are lazily initialized at the first > > beginTransaction() call. > > > > -Jay > > > > > > > > > > > > > > > > On Mon, Dec 12, 2016 at 9:17 PM, Jason Gustafson <ja...@confluent.io> > > wrote: > > > > > @Neha > > > > > > > > > 1. I think we should consider renaming initTransactions to just init() > > and > > > > moving the metadata initialization there. Let's make sure we don't > add > > > APIs > > > > that are relevant to this proposal only. Instead, try to think what > > we'd > > > > propose if we were writing the producer from scratch today. I suspect > > we > > > > would end up with an init() API that would do the metadata > > initialization > > > > as well as the transaction stuff lazily. If so, let's make that > change > > > now. > > > > > > > > > I think the only awkwardness with `init()` is that it would probably > have > > > to be an optional API for non-transactional usage to support existing > > code. > > > I'm also not sure what metadata we can actually initialize at that > point > > > since we don't know which topics will be produced to. That said, I'm > also > > > not fond of the `initTransactions` name, and we may find other uses > for a > > > generic `init()` in the future, so I'm in favor this renaming. > > > > > > > > > > 2. Along the same lines, let's think about the role of each id that > the > > > > producer will have and see if everything still makes sense. For > > instance, > > > > we have quite a few per-producer-instance notions -- client.id, a > > > producer > > > > id and a transaction.app.id, some set via config and some generated > > > > on-the-fly. What role does each play, how do they relate to each > other > > > and > > > > is there an opportunity to get rid of any. > > > > > > > > > The abundance of ids is super annoying. The producer ID is not actually > > > exposed in either the producer or consumer, but I'm not sure how > > successful > > > we'll be in hiding its existence from the user (you probably need to > know > > > about it for debugging and administrative purposes at least). This > issue > > > has been a continual thorn and I'm not sure I have a great answer. We > > have > > > been tempted to use client.id as the AppID at one point or another, > but > > > its > > > current usage is to have the same value for all producers in an > > > application. The lack of an AppID meant that we would have to expose > the > > > producer ID and the application would be responsible for persisting it. > > In > > > the use cases we looked at, it was simpler to let the application > provide > > > its own ID through configuration. And in use cases where there was no > > > obvious ID to serve as the AppID, it seemed simple enough to let the > > > application generate its own. We also looked at removing the producer > ID. > > > This was discussed somewhere above, but basically the idea is to store > > the > > > AppID in the message set header directly and avoid the mapping to > > producer > > > ID altogether. As long as batching isn't too bad, the impact on total > > size > > > may not be too bad, but we were ultimately more comfortable with a > fixed > > > size ID. > > > > > > 3. I think we should definitely consider renaming transaction.app.id > to > > > > something else. Given that we already have a notion of > application.id > > > and > > > > it represents the entire Streams application, having > > transaction.app.id > > > > that represents a producer instance is confusing. I do understand > that, > > > for > > > > Streams, the user doesn't have to set transaction.app.id as it will > > > likely > > > > be application.id+taskId (am I understanding that correctly?) > > > > > > > > > Your understanding is correct. The "transaction" prefix was intended to > > > make it clear that it was only needed for transactional usage. We've > also > > > referred to the AppID as a producer "instance ID." This is more > > suggestive > > > of the fact that it needs to be unique within the producers of a > > particular > > > application. Maybe we could drop the "transaction" and use " > instance.id" > > > or > > > "app.instance.id"? Not sure that's any better, but perhaps it avoids > the > > > confusion with application.id? > > > > > > Thanks, > > > Jason > > > > > > On Mon, Dec 12, 2016 at 8:37 PM, Jason Gustafson <ja...@confluent.io> > > > wrote: > > > > > > > @Becket > > > > > > > > It has been a pain in many cases that we do not know the number of > > > >> messages in a message set, not sure if the OffsetDelta field in > the > > > >> wrapper > > > >> message will address this. > > > > > > > > > > > > Interestingly, we had this in one of the design iterations, but we > > found > > > > in the prototype that we weren't really using it. Did you have a > > > particular > > > > use case in mind? I share the intuition that it may be helpful to > know, > > > but > > > > I don't have a clear example in mind. In fact, in the initial > version, > > we > > > > attempted to let the message set always represent a contiguous > sequence > > > of > > > > messages. In that case, the message set only needed a base offset > and a > > > > count of the number of messages, and the individual messages no > longer > > > > needed the offset delta. We ultimately abandoned that because we were > > > > uncomfortable with its impact on compaction. > > > > > > > > -Jason > > > > > > > > On Mon, Dec 12, 2016 at 5:55 PM, Guozhang Wang <wangg...@gmail.com> > > > wrote: > > > > > > > >> Andrew, > > > >> > > > >> As I mentioned above, in Kafka durability is supported via data > > > >> replication > > > >> instead of sync-flushing to disks. KIP-98 does not try to change > that > > > part > > > >> of the Kafka: if all your replicas are gone at the same time before > > the > > > >> data was ever flushed to disks, then your data is lost today, and it > > > will > > > >> be still the case after KIP-98. > > > >> > > > >> As for atomicity, KIP-98 does provide all-or-nothing guarantee for > > > writes > > > >> to multiple partitions, and it is based on its existing durability > > > >> guarantees. So it is possible that if your durability breaks, then > > > >> atomicity will be violated: some of the committed transaction's > > messages > > > >> could be lost if the above scenarios happen while others can be > > > >> successfully appended. My take is that, if you have concerns that > > > Kafka's > > > >> replication mechanism i not good enough for your durability > > requirements > > > >> as > > > >> of today, then you should have the same level of concerns with > > > durability > > > >> if you want to use Kafka with KIP-98 as your transactional queuing > > > system > > > >> as well. > > > >> > > > >> > > > >> Guozhang > > > >> > > > >> > > > >> On Mon, Dec 12, 2016 at 1:49 AM, Andrew Schofield < > > > >> andrew_schofi...@live.com > > > >> > wrote: > > > >> > > > >> > Guozhang, > > > >> > Exactly. This is the crux of the matter. Because it's async, the > log > > > is > > > >> > basically > > > >> > slightly out of date wrt to the run-time state and a failure of > all > > > >> > replicas might > > > >> > take the data slightly back in time. > > > >> > > > > >> > Given this, do you think that KIP-98 gives an all-or-nothing, > > > >> > no-matter-what guarantee > > > >> > for Kafka transactions? I think the key is whether the data which > is > > > >> > asynchronously > > > >> > flushed is guaranteed to be recovered atomically in all cases. > > > >> > Asynchronous but > > > >> > atomic would be good. > > > >> > > > > >> > Andrew Schofield > > > >> > IBM Watson and Cloud Platform > > > >> > > > > >> > > > > >> > > > > > >> > > From: Guozhang Wang <wangg...@gmail.com> > > > >> > > Sent: 09 December 2016 22:59 > > > >> > > To: dev@kafka.apache.org > > > >> > > Subject: Re: [DISCUSS] KIP-98: Exactly Once Delivery and > > > Transactional > > > >> > Messaging > > > >> > > > > > >> > > Onur, > > > >> > > > > > >> > > I understand your question now. So it is indeed possible that > > after > > > >> > > commitTxn() returned the messages could still be lost > permanently > > if > > > >> all > > > >> > > replicas failed before the data was flushed to disk. This is the > > > >> virtue > > > >> > of > > > >> > > Kafka's design to reply on replication (probably in memory) for > > high > > > >> > > availability, hence async flushing. This scenario already exist > > > today > > > >> and > > > >> > > KIP-98 did not intend to change this factor in any ways. > > > >> > > > > > >> > > Guozhang > > > >> > > > > > >> > > > > > >> > > > > > >> > > > > >> > > > >> > > > >> > > > >> -- > > > >> -- Guozhang > > > >> > > > > > > > > > > > > > >