I see, having both topic data and changelog in kafka can make the kafka
transaction atomic.  But in the case of streaming, the RocksDB is an
external data source during applying changelog to RocksDB.  In terms of
rollback and resume/re-apply kafka transaction, how do we make sure the
RocksDB transaction can also rolled back and re-apply during kafka
transaction boundary?


On Thu, Dec 1, 2016 at 11:05 AM, Guozhang Wang <wangg...@gmail.com> wrote:

> @Henry Cai,
>
> I am working on a separate KIP on Streams to leverage this KIP to have
> exactly-once processing semantics (note the exactly-once processing is a
> bit different from exactly-once delivery semantics), which should cover
> your question.
>
> The short answer is that writing the changelog messages need to be part of
> the transaction, and when a fatal error happens within a transaction, since
> the store updates cannot be rolled back like the messages in the worst case
> we need to restore from the changelog from scratch, or from a checkpoint
> with a starting offset in changelog, and restoring consumer will fetch
> committed messages only as well.
>
>
> Guozhang
>
> On Thu, Dec 1, 2016 at 9:34 AM, Apurva Mehta <apu...@confluent.io> wrote:
>
> > Hi Daniel,
> >
> > That is a very good point. You are correct in saying that one does not
> need
> > a transaction coordinator to get idempotent semantics.
> >
> > There are, however, three reasons why we chose this route:
> >
> >    1. The request to find a transaction coordinator is exactly the same
> as
> >    the request consumers use to find the group coordinator. So if clients
> >    already implement the new consumer, you should already have the code
> you
> >    need to find the transaction coordinator. I would even so far as to
> say
> >    that the majority coordinator discovery code can be effectively shared
> >    between producers and consumers. Jason should correct me on this,
> > however,
> >    since he is most familiar with that bit.
> >    2. With this route, the broker side changes are simpler. In
> particular,
> >    we have to implement the InitPIDRequest only in the coordinator.
> >    3. By always having a transaction coordinator, we can enable
> >    applications to use transactions even if they don't specify the AppId.
> > The
> >    only thing you lose is transaction recovery across sessions.
> >
> > Needless to say, we did debate this point extensively. What swung our
> > decision ultimately was the following observation: if the user does not
> > provide a transaction.app.id, the client can generate a UUID and use
> that
> > as the appId for the rest of the session. This means that there are no
> > branches in the client and server code, and is overall simpler to
> maintain.
> > All the producer APIs are also available to the user and it would be more
> > intuitive.
> >
> > It also means that clients cannot choose idempotence without
> transactions,
> > and hence it does place a greater burden on implementors of kafka
> clients.
> > But the cost should be minimal given point 1 above, and was deemed worth
> > it.
> >
> > Thanks once more for your thoughtful comments. It would be great for
> other
> > client implementors to chime in on this.
> >
> > Regards,
> > Apurva
> >
> >
> > On Thu, Dec 1, 2016 at 3:16 AM, Daniel Schierbeck
> > <da...@zendesk.com.invalid
> > > wrote:
> >
> > > Hi there,
> > >
> > > I'm the author of ruby-kafka, and as such am slightly biased towards
> > > simplicity of implementation :-)
> > >
> > > I like the proposal, and would love to use idempotent producer
> semantics
> > in
> > > our projects at Zendesk, but I'm a bit worried about the complexity
> that
> > > would go into the clients; specifically: it sounds to me that in order
> to
> > > get idempotent producer semantics, I'd have to implement the
> transaction
> > > coordinator discovery. I may be wrong, but it would seem that it's not
> > > strictly necessary if you're not using transactions – we could just use
> > the
> > > topic partition's leader as the coordinator, avoiding the extra
> > discovery.
> > > In my experience, most bugs are related to figuring out which broker is
> > the
> > > leader of which partition/group/whatever, so minimizing the number of
> > > moving parts would be beneficial to me. I'd also like to point out
> that I
> > > would be reluctant to implement the transaction API in the near future,
> > but
> > > would love to implement the idempotency API soon. The former seems only
> > > relevant to real stream processing frameworks, which is probably not
> the
> > > best use case for ruby-kafka.
> > >
> > > Cheers,
> > > Daniel Schierbeck
> > >
> > > On Thu, Dec 1, 2016 at 9:54 AM Jason Gustafson <ja...@confluent.io>
> > wrote:
> > >
> > > > Hey Neha,
> > > >
> > > > Thanks for the thoughtful questions. I'll try to address the first
> > > question
> > > > since Apurva addressed the second. Since most readers are probably
> > > getting
> > > > up to speed with this large proposal, let me first take a step back
> and
> > > > explain why we need the AppID at all. As Confluent tradition
> demands, I
> > > > present you a big wall of text:
> > > >
> > > > Clearly "exactly once" delivery requires resilience to client
> failures.
> > > > When a client crashes or turns into a zombie, another client must
> > > > eventually be started to resume the work. There are two problems: 1)
> we
> > > > need to ensure that the old process is actually dead or at least that
> > it
> > > > cannot write any more data, and 2) we need to be able to pick up
> > wherever
> > > > the last process left off. To do either of these, we need some kind
> of
> > > > identifier to tie the two instances together.
> > > >
> > > > There are only two choices for where this ID comes from: either the
> > user
> > > > gives it to us or the server generates it. In the latter case, the
> user
> > > is
> > > > responsible for fetching it from the client and persisting it
> somewhere
> > > for
> > > > use after failure. We ultimately felt that the most flexible option
> is
> > to
> > > > have the user give it to us. In many applications, there is already a
> > > > natural identifier which is already used to divide the workload. For
> > > > example, in Kafka Streams and Kafka Connect, we have a taskId. For
> > > > applications where there is no natural ID, the user can generate a
> UUID
> > > and
> > > > persist it locally, which is as good as having the server generate
> it.
> > > >
> > > > So the AppID is used to provide continuity between the instances of a
> > > > producer which are handling a certain workload. One of the early
> design
> > > > decisions we made in this work was to make the delivery guarantees we
> > > > provide agnostic of the workload that the producer is assigned. The
> > > > producer is not in the business of trying to divide up the work among
> > all
> > > > its peers who are participating in the same duty (unlike the
> consumer,
> > we
> > > > don't know anything about where the data comes from). This has huge
> > > > implications for "exactly-once" delivery because it puts the burden
> on
> > > the
> > > > user to divide the total workload among producer instances and to
> > assign
> > > > AppIDs accordingly.
> > > >
> > > > I've been using the term "workload" loosely, but we usually imagine
> > > > something like Kafka Connect's notion of a "source partition." A
> source
> > > > partition could be a topic partition if the source is Kafka, or it
> > could
> > > be
> > > > a database table, a log file, or whatever makes sense for the source
> of
> > > the
> > > > data. The point is that it's an independent source of data which can
> be
> > > > assigned to a producer instance.
> > > >
> > > > If the same source partition is always assigned to the producer with
> > the
> > > > the same AppID, then Kafka transactions will give you "exactly once"
> > > > delivery without much additional work. On initialization, the
> producer
> > > will
> > > > ensure that 1) any previous producers using that AppID are "fenced"
> > off,
> > > > and 2) that any transaction which had been started by a previous
> > producer
> > > > with that AppID have either completed or aborted.
> > > >
> > > > Based on this, it should be clear that the ideal is to divide the
> > > workload
> > > > so that you have a one-to-one mapping from the source partition to
> the
> > > > AppID. If the source of the data is Kafka, then the source partition
> is
> > > > just a topic partition, and the AppID can be generated from the name
> of
> > > the
> > > > topic and the partition number.
> > > >
> > > > To finally get back to your auto-scaling question, let's assume for a
> > > > moment the ideal mapping of source partition to AppID. The main
> > question
> > > is
> > > > whether the scaling is "horizontal" or "vertical." By horizontal, I
> > mean
> > > an
> > > > increase in the number of source partitions. This case is easy.
> Assign
> > > new
> > > > AppIDs based on the new source partitions and you're done.
> > > >
> > > > But if the scaling is vertical (i.e. an increase in the load on the
> > > source
> > > > partitions), there's not much this proposal can do to help. You're
> > going
> > > to
> > > > have to break the source partition into child partitions, and assign
> > each
> > > > of the new partitions a new AppID. To preserve "exactly once"
> delivery,
> > > you
> > > > must make sure that the producers using the AppID assigned to the
> > parent
> > > > partition have been shutdown cleanly. We could provide a way to pass
> > in a
> > > > "parent AppID" so that the producer could check the appropriate
> safety
> > > > conditions, but for the first version, we assume that users consider
> > > > scaling requirements when dividing the workload into source
> partitions.
> > > >
> > > > Unfortunately, the real world is always falling short of the ideal,
> and
> > > > it's not always practical to have a one-to-one mapping of source
> > > partition
> > > > to AppID, since that also implies a one-to-one mapping of source
> > > partition
> > > > to producer instance. If I were a user, I'd push this limit as far as
> > is
> > > > reasonable, but with enough source partitions, it eventually breaks
> > down.
> > > > At some point, you need a producer to handle the load of more than
> one
> > > > source partition. This is fine in itself if the assignment is sticky:
> > > that
> > > > is, if we can ensure that the same source partition is assigned to
> the
> > > > producer using a certain AppID. If not, then the user is responsible
> > for
> > > > ensuring a clean hand-off. The producer reading from the migrating
> > source
> > > > partition must stop reading, commit or abort any transaction
> containing
> > > > data processed from that source partition, and then signal the
> producer
> > > > which is taking over that it is safe to begin.
> > > >
> > > > This burden is a consequence of the decision to keep the producer out
> > of
> > > > the role of assigning work. We could do more if we forced users to
> > > > formalize their application-specific notion of a source partition,
> and
> > if
> > > > we turned the producer into something like a consumer group, with a
> > > > rebalance protocol. This would allow the broker to be the one to
> > ensure a
> > > > clean hand-off of work, but it would be a huge departure from the way
> > the
> > > > producer currently works, and not all applications have a notion of
> > > source
> > > > partition anyway. So the result is a bit more work for the user,
> though
> > > of
> > > > course it would be transparent to for Kafka Streams users.
> > > >
> > > > One final note. I've described above how to get the strongest
> > guarantees
> > > > that this work is capable of providing in an auto-scaling
> environment.
> > We
> > > > also provide weaker guarantees, which are still an improvement over
> the
> > > > current state. For example, without specifying any kind of AppID, we
> > > > provide idempotent production for the lifetime of a producer
> instance.
> > > This
> > > > ensures reliable delivery without duplicates even with broker
> failures.
> > > It
> > > > is also possible to use transactions without an ephemeral AppID. If
> the
> > > > application generates a UUID for user as the AppID, and only uses it
> > for
> > > > the lifetime of a single producer, you can still take advantage of
> > > > transactional semantics, which allows you to write to a set of
> messages
> > > to
> > > > multiple partitions atomically.
> > > >
> > > > Hope that answers the question and helps others understand the work a
> > bit
> > > > better!
> > > >
> > > > Thanks,
> > > > Jason
> > > >
> > > >
> > > >
> > > >
> > > > On Wed, Nov 30, 2016 at 9:51 PM, Apurva Mehta <apu...@confluent.io>
> > > wrote:
> > > >
> > > > > Thanks for your comment, I updated the document. Let me know if it
> is
> > > > clear
> > > > > now.
> > > > >
> > > > > Apurva
> > > > >
> > > > > On Wed, Nov 30, 2016 at 9:42 PM, Onur Karaman <
> > > > > onurkaraman.apa...@gmail.com>
> > > > > wrote:
> > > > >
> > > > > > @Apurva yep that's what I was trying to say.
> > > > > >
> > > > > > Original message:
> > > > > > If there is already an entry with the AppID in the mapping,
> > increment
> > > > the
> > > > > > epoch number and go on to the next step. If there is no entry
> with
> > > the
> > > > > > AppID in the mapping, construct a PID with initialized epoch
> > number;
> > > > > append
> > > > > > an AppID message into the transaction topic, insert into the
> > mapping
> > > > and
> > > > > > reply with the PID / epoch / timestamp.
> > > > > >
> > > > > > Just wanted to make it explicit because:
> > > > > > 1. The "append an AppID message..." chunk was ambiguous on
> whether
> > it
> > > > > > applied to the "if exists" or "if not exists" condition
> > > > > > 2. I think the google doc is pretty explicit on appending to the
> > log
> > > > > > everywhere else.
> > > > > >
> > > > > > On Wed, Nov 30, 2016 at 9:36 PM, Apurva Mehta <
> apu...@confluent.io
> > >
> > > > > wrote:
> > > > > >
> > > > > > > The first line in step 2 of that section is: "If there is
> already
> > > an
> > > > > > entry
> > > > > > > with the AppID in the mapping, increment the epoch number and
> go
> > on
> > > > to
> > > > > > the
> > > > > > > next step."
> > > > > > >
> > > > > > > Are you suggesting that it be made explicit that 'increment the
> > > epoch
> > > > > > > number' includes persisting the updated value to the log?
> > > > > > >
> > > > > > > Thanks,
> > > > > > > Apurva
> > > > > > >
> > > > > > > On Wed, Nov 30, 2016 at 9:21 PM, Onur Karaman <
> > > > > > > onurkaraman.apa...@gmail.com>
> > > > > > > wrote:
> > > > > > >
> > > > > > > > Nice google doc!
> > > > > > > >
> > > > > > > > Probably need to go over the google doc a few more times,
> but a
> > > > minor
> > > > > > > > comment from the first pass:
> > > > > > > >
> > > > > > > > In Transaction Coordinator Request Handling (
> > > > > > > > https://docs.google.com/document/d/11Jqy_
> > > > <https://docs.google.com/document/d/11Jqy_>
> > > > > > GjUGtdXJK94XGsEIK7CP1SnQGdp2eF
> > > > > > > > 0wSw9ra8/edit#bookmark=id.jro89lml46du),
> > > > > > > > step 2 mentions that if the Transaction Coordinator doesn't
> > > already
> > > > > > see a
> > > > > > > > producer with the same app-id, it creates a pid and appends
> > > > (app-id,
> > > > > > pid,
> > > > > > > > epoch) into the transaction log.
> > > > > > > >
> > > > > > > > What about if the app-id/pid pair already exists and we
> > increment
> > > > the
> > > > > > > > epoch? Should we append (app-id, pid, epoch++) to the
> > transaction
> > > > > log?
> > > > > > I
> > > > > > > > think we should, but step 2 doesn't mention this.
> > > > > > > >
> > > > > > > > On Wed, Nov 30, 2016 at 5:35 PM, Apurva Mehta <
> > > apu...@confluent.io
> > > > >
> > > > > > > wrote:
> > > > > > > >
> > > > > > > > > Thanks for your comments, let me deal with your second
> point
> > > > > > regarding
> > > > > > > > > merging the __consumer-offsets and transactions topic.
> > > > > > > > >
> > > > > > > > > Needless to say, we considered doing this, but chose to
> keep
> > > them
> > > > > > > > separate
> > > > > > > > > for the following reasons:
> > > > > > > > >
> > > > > > > > > 1. Your assumption that group.id and transaction.app.id
> can
> > be
> > > > > > the
> > > > > > > > same
> > > > > > > > > does not hold for streams applications. All colocated tasks
> > of
> > > a
> > > > > > > > streams
> > > > > > > > > application will share the same consumer (and hence
> > implicitly
> > > > > > will
> > > > > > > > have
> > > > > > > > > the same group.id), but each task will have its own
> producer
> > > > > > > > instance.
> > > > > > > > > The transaction.app.id for each producer instance will
> still
> > > > > have
> > > > > > > to
> > > > > > > > be
> > > > > > > > > distinct. So to colocate the transaction and consumer group
> > > > > > > > > coordinators,
> > > > > > > > > we will have to now introduce a 'group.id' config in the
> > > > > producer
> > > > > > > and
> > > > > > > > > require it to be the same as the consumer. This seemed
> like a
> > > > > very
> > > > > > > > > fragile
> > > > > > > > > option.
> > > > > > > > > 2. Following on from the above, the transaction coordinator
> > and
> > > > > > > group
> > > > > > > > > coordinator would _have_ to be colocated inorder to be the
> > > > > leader
> > > > > > > for
> > > > > > > > > the
> > > > > > > > > same TopicPartition, unless we wanted to make even more
> > > > > > fundamental
> > > > > > > > > changes
> > > > > > > > > to Kafka.
> > > > > > > > > 3. We don't require that the consumer coordinator and the
> > > > > > > transaction
> > > > > > > > > coordinator have the same view of the current PID/Epoch
> pair.
> > > > > If a
> > > > > > > > > producer
> > > > > > > > > instance is bounced, the epoch will be bumped. Any
> > transactions
> > > > > > > > > initiated
> > > > > > > > > by the previous instance would either be fully committed or
> > > > > fully
> > > > > > > > rolled
> > > > > > > > > back. Since the writes to the offset topics are just like
> > > writes
> > > > > > to
> > > > > > > a
> > > > > > > > > regular topic, these would enjoy the same guarantees, and
> the
> > > > > > > > > inconsistency
> > > > > > > > > will be eventually resolved.
> > > > > > > > > 4. Finally, every application will have consumers, and
> hence
> > > > > > record
> > > > > > > > > consumer offsets. But a very small fraction of applications
> > > > > would
> > > > > > > use
> > > > > > > > > transactions. Blending the two topics would make recovering
> > > > > > > > transaction
> > > > > > > > > coordinator state unnecessarily inefficient since it has to
> > > read
> > > > > > > from
> > > > > > > > > the
> > > > > > > > > beginning of the topic to reconstruct its data structures
> --
> > it
> > > > > > > would
> > > > > > > > > have
> > > > > > > > > to inspect and skip a majority of the messages if the
> offsets
> > > > > were
> > > > > > > in
> > > > > > > > > the
> > > > > > > > > same topic.
> > > > > > > > >
> > > > > > > > > Thanks,
> > > > > > > > > Apurva
> > > > > > > > >
> > > > > > > > > On Wed, Nov 30, 2016 at 4:47 PM, Neha Narkhede <
> > > > n...@confluent.io>
> > > > > > > > wrote:
> > > > > > > > >
> > > > > > > > > > Thanks for initiating this KIP! I think it is well
> written
> > > and
> > > > > I'm
> > > > > > > > > excited
> > > > > > > > > > to see the first step towards adding an important feature
> > in
> > > > > Kafka.
> > > > > > > > > >
> > > > > > > > > > I had a few initial thoughts on the KIP, mostly not as
> > deeply
> > > > > > thought
> > > > > > > > > > through than what you've done -
> > > > > > > > > >
> > > > > > > > > > 1. Perhaps you’ve thought about how this would work
> > already —
> > > > > since
> > > > > > > we
> > > > > > > > > now
> > > > > > > > > > require a producer to specify a unique AppID across
> > different
> > > > > > > instances
> > > > > > > > > of
> > > > > > > > > > an application, how would applications that run in the
> > cloud
> > > > use
> > > > > > this
> > > > > > > > > > feature with auto scaling?
> > > > > > > > > >
> > > > > > > > > > 2. Making it easy for applications to get exactly-once
> > > > semantics
> > > > > > for
> > > > > > > a
> > > > > > > > > > consume-process-produce workflow is a great feature to
> > have.
> > > To
> > > > > > > enable
> > > > > > > > > > this, the proposal now includes letting a producer
> > initiate a
> > > > > write
> > > > > > > to
> > > > > > > > > the
> > > > > > > > > > offset topic as well (just like consumers do). The
> consumer
> > > > > > > coordinator
> > > > > > > > > > (which could be on a different broker than the txn
> > > coordinator)
> > > > > > would
> > > > > > > > > then
> > > > > > > > > > validate if the PID and producer epoch is valid before it
> > > > writes
> > > > > to
> > > > > > > the
> > > > > > > > > > offset topic along with the associated PID. This is a
> great
> > > > > feature
> > > > > > > > > though
> > > > > > > > > > I see 2 difficulties
> > > > > > > > > >
> > > > > > > > > > -- This needs the consumer coordinator to have a
> consistent
> > > > view
> > > > > of
> > > > > > > the
> > > > > > > > > > PID/epochs that is same as the view on the txn
> coordinator.
> > > > > > However,
> > > > > > > as
> > > > > > > > > the
> > > > > > > > > > offset and the transaction topics are different, the 2
> > > > > coordinators
> > > > > > > > might
> > > > > > > > > > live on different brokers.
> > > > > > > > > > -- We now also have 2 internal topics - a transaction
> topic
> > > and
> > > > > the
> > > > > > > > > > __consumer_offsets topic.
> > > > > > > > > >
> > > > > > > > > > Maybe you’ve thought about this already and discarded it
> > ...
> > > > let
> > > > > me
> > > > > > > > make
> > > > > > > > > a
> > > > > > > > > > somewhat crazy proposal — Why don’t we upgrade the
> > > transaction
> > > > > > topic
> > > > > > > to
> > > > > > > > > be
> > > > > > > > > > the new offsets topic as well? For consumers that want
> EoS
> > > > > > guarantees
> > > > > > > > for
> > > > > > > > > > a consume-process-produce pattern, the group.id is the
> > same
> > > as
> > > > > the
> > > > > > > > > > transaction.app.id set for the producer. Assume that the
> > > > > > transaction
> > > > > > > > > topic
> > > > > > > > > > also stores consumer offsets. It stores both the
> > transaction
> > > > > > metadata
> > > > > > > > > > messages as well as offset messages, both for
> transactional
> > > as
> > > > > well
> > > > > > > as
> > > > > > > > > > non-transactional consumers. Since the group.id of the
> > > > consumer
> > > > > > and
> > > > > > > > the
> > > > > > > > > > app.id of the producer is the same, the offsets
> associated
> > > > with
> > > > > a
> > > > > > > > > consumer
> > > > > > > > > > group and topic-partition end up in the same transaction
> > > topic
> > > > > > > > partition
> > > > > > > > > as
> > > > > > > > > > the transaction metadata messages. The transaction
> > > coordinator
> > > > > and
> > > > > > > the
> > > > > > > > > > consumer coordinator always live on the same broker since
> > > they
> > > > > both
> > > > > > > map
> > > > > > > > > to
> > > > > > > > > > the same partition in the transaction topic. Even if
> there
> > > are
> > > > > > > > failures,
> > > > > > > > > > they end up on the same new broker. Hence, they share the
> > > same
> > > > > and
> > > > > > > > > > consistent view of the PIDs, epochs and App IDs, whatever
> > it
> > > > is.
> > > > > > The
> > > > > > > > > > consumer coordinator will skip over the transaction
> > metadata
> > > > > > messages
> > > > > > > > > when
> > > > > > > > > > it bootstraps the offsets from this new topic for
> consumer
> > > > groups
> > > > > > > that
> > > > > > > > > are
> > > > > > > > > > not involved in a transaction and don’t have a txn id
> > > > associated
> > > > > > with
> > > > > > > > the
> > > > > > > > > > offset message in the transaction topic. The consumer
> > > > coordinator
> > > > > > > will
> > > > > > > > > > expose only committed offsets in cases of consumer groups
> > > that
> > > > > are
> > > > > > > > > involved
> > > > > > > > > > in a txn. It will also be able to validate the
> > > > > OffsetCommitRequests
> > > > > > > > > coming
> > > > > > > > > > from a transactional producer by ensuring that it is
> coming
> > > > from
> > > > > a
> > > > > > > > valid
> > > > > > > > > > PID, producer epoch since it uses the same view of this
> > data
> > > > > > created
> > > > > > > by
> > > > > > > > > the
> > > > > > > > > > transaction coordinator (that lives on the same broker).
> > And
> > > we
> > > > > > will
> > > > > > > > end
> > > > > > > > > up
> > > > > > > > > > with one internal topic, not too.
> > > > > > > > > >
> > > > > > > > > > This proposal offers better operational simplicity and
> > fewer
> > > > > > internal
> > > > > > > > > > topics but there are some downsides that come with it —
> > there
> > > > > are 2
> > > > > > > > types
> > > > > > > > > > of messages in one topic (txn metadata ones and offset
> > ones).
> > > > > Since
> > > > > > > > this
> > > > > > > > > > internal topic serves a dual purpose, it will be harder
> to
> > > name
> > > > > it
> > > > > > > and
> > > > > > > > > also
> > > > > > > > > > design a message format that includes the different types
> > of
> > > > > > messages
> > > > > > > > > that
> > > > > > > > > > will live in the topic. Though the transaction topic
> > already
> > > > > needs
> > > > > > to
> > > > > > > > > write
> > > > > > > > > > 5 different types of messages (the AppID->PID mapping,
> the
> > > > > BeginTxn
> > > > > > > > > > message, InsertTxn, PrepareCommit, Committed/Aborted) so
> > > maybe
> > > > > > adding
> > > > > > > > the
> > > > > > > > > > offset message isn't a big deal?
> > > > > > > > > >
> > > > > > > > > > Back when we introduced the offsets topic, we had
> discussed
> > > > > making
> > > > > > it
> > > > > > > > > more
> > > > > > > > > > general and allowing the producer to send offset commit
> > > > messages
> > > > > to
> > > > > > > it
> > > > > > > > > but
> > > > > > > > > > ended up creating a specialized topic to allow the
> consumer
> > > > > > > coordinator
> > > > > > > > > to
> > > > > > > > > > wall off and prevent unauthorized writes from consumers
> > > outside
> > > > > of
> > > > > > a
> > > > > > > > > group.
> > > > > > > > > > Jason can comment on the specifics but I don't believe
> that
> > > > goal
> > > > > of
> > > > > > > the
> > > > > > > > > new
> > > > > > > > > > consumer protocol was quite achieved.
> > > > > > > > > >
> > > > > > > > > > I have other comments on the message format, request
> names
> > > etc
> > > > > but
> > > > > > > > wanted
> > > > > > > > > > to get your thoughts on these 2 issues first :-)
> > > > > > > > > >
> > > > > > > > > > On Wed, Nov 30, 2016 at 2:19 PM Guozhang Wang <
> > > > > wangg...@gmail.com>
> > > > > > > > > wrote:
> > > > > > > > > >
> > > > > > > > > > > Hi all,
> > > > > > > > > > >
> > > > > > > > > > > I have just created KIP-98 to enhance Kafka with
> exactly
> > > once
> > > > > > > > delivery
> > > > > > > > > > > semantics:
> > > > > > > > > > >
> > > > > > > > > > > *
> > > > > > > > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> > > > <https://cwiki.apache.org/confluence/display/KAFKA/KIP->
> > > > > > > > > > 98+-+Exactly+Once+Delivery+and+Transactional+Messaging
> > > > > > > > > > > <
> > > > > > > > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> > > > <https://cwiki.apache.org/confluence/display/KAFKA/KIP->
> > > > > > > > > > 98+-+Exactly+Once+Delivery+and+Transactional+Messaging
> > > > > > > > > > > >*
> > > > > > > > > > >
> > > > > > > > > > > This KIP adds a transactional messaging mechanism along
> > > with
> > > > an
> > > > > > > > > > idempotent
> > > > > > > > > > > producer implementation to make sure that 1) duplicated
> > > > > messages
> > > > > > > sent
> > > > > > > > > > from
> > > > > > > > > > > the same identified producer can be detected on the
> > broker
> > > > > side,
> > > > > > > and
> > > > > > > > > 2) a
> > > > > > > > > > > group of messages sent within a transaction will
> > atomically
> > > > be
> > > > > > > either
> > > > > > > > > > > reflected and fetchable to consumers or not as a whole.
> > > > > > > > > > >
> > > > > > > > > > > The above wiki page provides a high-level view of the
> > > > proposed
> > > > > > > > changes
> > > > > > > > > as
> > > > > > > > > > > well as summarized guarantees. Initial draft of the
> > > detailed
> > > > > > > > > > implementation
> > > > > > > > > > > design is described in this Google doc:
> > > > > > > > > > >
> > > > > > > > > > > https://docs.google.com/document/d/11Jqy_
> > > > <https://docs.google.com/document/d/11Jqy_>
> > > > > > > > > GjUGtdXJK94XGsEIK7CP1SnQGdp2eF
> > > > > > > > > > > 0wSw9ra8
> > > > > > > > > > > <https://docs.google.com/document/d/11Jqy_
> > > > <https://docs.google.com/document/d/11Jqy_>
> > > > > > > > > GjUGtdXJK94XGsEIK7CP1SnQGdp2eF
> > > > > > > > > > 0wSw9ra8>
> > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > > > We would love to hear your comments and suggestions.
> > > > > > > > > > >
> > > > > > > > > > > Thanks,
> > > > > > > > > > >
> > > > > > > > > > > -- Guozhang
> > > > > > > > > > >
> > > > > > > > > > --
> > > > > > > > > > Thanks,
> > > > > > > > > > Neha
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
>
>
>
> --
> -- Guozhang
>

Reply via email to