Hey Neha,

Thanks for the thoughtful questions. I'll try to address the first question
since Apurva addressed the second. Since most readers are probably getting
up to speed with this large proposal, let me first take a step back and
explain why we need the AppID at all. As Confluent tradition demands, I
present you a big wall of text:

Clearly "exactly once" delivery requires resilience to client failures.
When a client crashes or turns into a zombie, another client must
eventually be started to resume the work. There are two problems: 1) we
need to ensure that the old process is actually dead or at least that it
cannot write any more data, and 2) we need to be able to pick up wherever
the last process left off. To do either of these, we need some kind of
identifier to tie the two instances together.

There are only two choices for where this ID comes from: either the user
gives it to us or the server generates it. In the latter case, the user is
responsible for fetching it from the client and persisting it somewhere for
use after failure. We ultimately felt that the most flexible option is to
have the user give it to us. In many applications, there is already a
natural identifier which is already used to divide the workload. For
example, in Kafka Streams and Kafka Connect, we have a taskId. For
applications where there is no natural ID, the user can generate a UUID and
persist it locally, which is as good as having the server generate it.

So the AppID is used to provide continuity between the instances of a
producer which are handling a certain workload. One of the early design
decisions we made in this work was to make the delivery guarantees we
provide agnostic of the workload that the producer is assigned. The
producer is not in the business of trying to divide up the work among all
its peers who are participating in the same duty (unlike the consumer, we
don't know anything about where the data comes from). This has huge
implications for "exactly-once" delivery because it puts the burden on the
user to divide the total workload among producer instances and to assign
AppIDs accordingly.

I've been using the term "workload" loosely, but we usually imagine
something like Kafka Connect's notion of a "source partition." A source
partition could be a topic partition if the source is Kafka, or it could be
a database table, a log file, or whatever makes sense for the source of the
data. The point is that it's an independent source of data which can be
assigned to a producer instance.

If the same source partition is always assigned to the producer with the
the same AppID, then Kafka transactions will give you "exactly once"
delivery without much additional work. On initialization, the producer will
ensure that 1) any previous producers using that AppID are "fenced" off,
and 2) that any transaction which had been started by a previous producer
with that AppID have either completed or aborted.

Based on this, it should be clear that the ideal is to divide the workload
so that you have a one-to-one mapping from the source partition to the
AppID. If the source of the data is Kafka, then the source partition is
just a topic partition, and the AppID can be generated from the name of the
topic and the partition number.

To finally get back to your auto-scaling question, let's assume for a
moment the ideal mapping of source partition to AppID. The main question is
whether the scaling is "horizontal" or "vertical." By horizontal, I mean an
increase in the number of source partitions. This case is easy. Assign new
AppIDs based on the new source partitions and you're done.

But if the scaling is vertical (i.e. an increase in the load on the source
partitions), there's not much this proposal can do to help. You're going to
have to break the source partition into child partitions, and assign each
of the new partitions a new AppID. To preserve "exactly once" delivery, you
must make sure that the producers using the AppID assigned to the parent
partition have been shutdown cleanly. We could provide a way to pass in a
"parent AppID" so that the producer could check the appropriate safety
conditions, but for the first version, we assume that users consider
scaling requirements when dividing the workload into source partitions.

Unfortunately, the real world is always falling short of the ideal, and
it's not always practical to have a one-to-one mapping of source partition
to AppID, since that also implies a one-to-one mapping of source partition
to producer instance. If I were a user, I'd push this limit as far as is
reasonable, but with enough source partitions, it eventually breaks down.
At some point, you need a producer to handle the load of more than one
source partition. This is fine in itself if the assignment is sticky: that
is, if we can ensure that the same source partition is assigned to the
producer using a certain AppID. If not, then the user is responsible for
ensuring a clean hand-off. The producer reading from the migrating source
partition must stop reading, commit or abort any transaction containing
data processed from that source partition, and then signal the producer
which is taking over that it is safe to begin.

This burden is a consequence of the decision to keep the producer out of
the role of assigning work. We could do more if we forced users to
formalize their application-specific notion of a source partition, and if
we turned the producer into something like a consumer group, with a
rebalance protocol. This would allow the broker to be the one to ensure a
clean hand-off of work, but it would be a huge departure from the way the
producer currently works, and not all applications have a notion of source
partition anyway. So the result is a bit more work for the user, though of
course it would be transparent to for Kafka Streams users.

One final note. I've described above how to get the strongest guarantees
that this work is capable of providing in an auto-scaling environment. We
also provide weaker guarantees, which are still an improvement over the
current state. For example, without specifying any kind of AppID, we
provide idempotent production for the lifetime of a producer instance. This
ensures reliable delivery without duplicates even with broker failures. It
is also possible to use transactions without an ephemeral AppID. If the
application generates a UUID for user as the AppID, and only uses it for
the lifetime of a single producer, you can still take advantage of
transactional semantics, which allows you to write to a set of messages to
multiple partitions atomically.

Hope that answers the question and helps others understand the work a bit
better!

Thanks,
Jason



On Wed, Nov 30, 2016 at 9:51 PM, Apurva Mehta <apu...@confluent.io> wrote:

> Thanks for your comment, I updated the document. Let me know if it is clear
> now.
>
> Apurva
>
> On Wed, Nov 30, 2016 at 9:42 PM, Onur Karaman <
> onurkaraman.apa...@gmail.com>
> wrote:
>
> > @Apurva yep that's what I was trying to say.
> >
> > Original message:
> > If there is already an entry with the AppID in the mapping, increment the
> > epoch number and go on to the next step. If there is no entry with the
> > AppID in the mapping, construct a PID with initialized epoch number;
> append
> > an AppID message into the transaction topic, insert into the mapping and
> > reply with the PID / epoch / timestamp.
> >
> > Just wanted to make it explicit because:
> > 1. The "append an AppID message..." chunk was ambiguous on whether it
> > applied to the "if exists" or "if not exists" condition
> > 2. I think the google doc is pretty explicit on appending to the log
> > everywhere else.
> >
> > On Wed, Nov 30, 2016 at 9:36 PM, Apurva Mehta <apu...@confluent.io>
> wrote:
> >
> > > The first line in step 2 of that section is: "If there is already an
> > entry
> > > with the AppID in the mapping, increment the epoch number and go on to
> > the
> > > next step."
> > >
> > > Are you suggesting that it be made explicit that 'increment the epoch
> > > number' includes persisting the updated value to the log?
> > >
> > > Thanks,
> > > Apurva
> > >
> > > On Wed, Nov 30, 2016 at 9:21 PM, Onur Karaman <
> > > onurkaraman.apa...@gmail.com>
> > > wrote:
> > >
> > > > Nice google doc!
> > > >
> > > > Probably need to go over the google doc a few more times, but a minor
> > > > comment from the first pass:
> > > >
> > > > In Transaction Coordinator Request Handling (
> > > > https://docs.google.com/document/d/11Jqy_
> > GjUGtdXJK94XGsEIK7CP1SnQGdp2eF
> > > > 0wSw9ra8/edit#bookmark=id.jro89lml46du),
> > > > step 2 mentions that if the Transaction Coordinator doesn't already
> > see a
> > > > producer with the same app-id, it creates a pid and appends (app-id,
> > pid,
> > > > epoch) into the transaction log.
> > > >
> > > > What about if the app-id/pid pair already exists and we increment the
> > > > epoch? Should we append (app-id, pid, epoch++) to the transaction
> log?
> > I
> > > > think we should, but step 2 doesn't mention this.
> > > >
> > > > On Wed, Nov 30, 2016 at 5:35 PM, Apurva Mehta <apu...@confluent.io>
> > > wrote:
> > > >
> > > > > Thanks for your comments, let me deal with your second point
> > regarding
> > > > > merging the __consumer-offsets and transactions topic.
> > > > >
> > > > > Needless to say, we considered doing this, but chose to keep them
> > > > separate
> > > > > for the following reasons:
> > > > >
> > > > >    1. Your assumption that group.id and transaction.app.id can be
> > the
> > > > same
> > > > >    does not hold for streams applications. All colocated tasks of a
> > > > streams
> > > > >    application will share the same consumer (and hence implicitly
> > will
> > > > have
> > > > >    the same group.id), but each task will have its own producer
> > > > instance.
> > > > >    The transaction.app.id for each producer instance will still
> have
> > > to
> > > > be
> > > > >    distinct. So to colocate the transaction and consumer group
> > > > > coordinators,
> > > > >    we will have to now introduce a 'group.id' config in the
> producer
> > > and
> > > > >    require it to be the same as the consumer. This seemed like a
> very
> > > > > fragile
> > > > >    option.
> > > > >    2. Following on from the above, the transaction coordinator and
> > > group
> > > > >    coordinator would _have_ to be colocated inorder to be the
> leader
> > > for
> > > > > the
> > > > >    same TopicPartition, unless we wanted to make even more
> > fundamental
> > > > > changes
> > > > >    to Kafka.
> > > > >    3. We don't require that the consumer coordinator and the
> > > transaction
> > > > >    coordinator have the same view of the current PID/Epoch pair.
> If a
> > > > > producer
> > > > >    instance is bounced, the epoch will be bumped. Any transactions
> > > > > initiated
> > > > >    by the previous instance would either be fully committed or
> fully
> > > > rolled
> > > > >    back. Since the writes to the offset topics are just like writes
> > to
> > > a
> > > > >    regular topic, these would enjoy the same guarantees, and the
> > > > > inconsistency
> > > > >    will be eventually resolved.
> > > > >    4. Finally, every application will have consumers, and hence
> > record
> > > > >    consumer offsets. But a very small fraction of applications
> would
> > > use
> > > > >    transactions. Blending the two topics would make recovering
> > > > transaction
> > > > >    coordinator state unnecessarily inefficient since it has to read
> > > from
> > > > > the
> > > > >    beginning of the topic to reconstruct its data structures -- it
> > > would
> > > > > have
> > > > >    to inspect and skip a majority of the messages if the offsets
> were
> > > in
> > > > > the
> > > > >    same topic.
> > > > >
> > > > > Thanks,
> > > > > Apurva
> > > > >
> > > > > On Wed, Nov 30, 2016 at 4:47 PM, Neha Narkhede <n...@confluent.io>
> > > > wrote:
> > > > >
> > > > > > Thanks for initiating this KIP! I think it is well written and
> I'm
> > > > > excited
> > > > > > to see the first step towards adding an important feature in
> Kafka.
> > > > > >
> > > > > > I had a few initial thoughts on the KIP, mostly not as deeply
> > thought
> > > > > > through than what you've done -
> > > > > >
> > > > > > 1. Perhaps you’ve thought about how this would work already —
> since
> > > we
> > > > > now
> > > > > > require a producer to specify a unique AppID across different
> > > instances
> > > > > of
> > > > > > an application, how would applications that run in the cloud use
> > this
> > > > > > feature with auto scaling?
> > > > > >
> > > > > > 2. Making it easy for applications to get exactly-once semantics
> > for
> > > a
> > > > > > consume-process-produce workflow is a great feature to have. To
> > > enable
> > > > > > this, the proposal now includes letting a producer initiate a
> write
> > > to
> > > > > the
> > > > > > offset topic as well (just like consumers do). The consumer
> > > coordinator
> > > > > > (which could be on a different broker than the txn coordinator)
> > would
> > > > > then
> > > > > > validate if the PID and producer epoch is valid before it writes
> to
> > > the
> > > > > > offset topic along with the associated PID. This is a great
> feature
> > > > > though
> > > > > > I see 2 difficulties
> > > > > >
> > > > > > -- This needs the consumer coordinator to have a consistent view
> of
> > > the
> > > > > > PID/epochs that is same as the view on the txn coordinator.
> > However,
> > > as
> > > > > the
> > > > > > offset and the transaction topics are different, the 2
> coordinators
> > > > might
> > > > > > live on different brokers.
> > > > > > -- We now also have 2 internal topics - a transaction topic and
> the
> > > > > > __consumer_offsets topic.
> > > > > >
> > > > > > Maybe you’ve thought about this already and discarded it ... let
> me
> > > > make
> > > > > a
> > > > > > somewhat crazy proposal — Why don’t we upgrade the transaction
> > topic
> > > to
> > > > > be
> > > > > > the new offsets topic as well? For consumers that want EoS
> > guarantees
> > > > for
> > > > > > a consume-process-produce pattern, the group.id is the same as
> the
> > > > > > transaction.app.id set for the producer. Assume that the
> > transaction
> > > > > topic
> > > > > > also stores consumer offsets. It stores both the transaction
> > metadata
> > > > > > messages as well as offset messages, both for transactional as
> well
> > > as
> > > > > > non-transactional consumers. Since the group.id of the consumer
> > and
> > > > the
> > > > > > app.id of the producer is the same, the offsets associated with
> a
> > > > > consumer
> > > > > > group and topic-partition end up in the same transaction topic
> > > > partition
> > > > > as
> > > > > > the transaction metadata messages. The transaction coordinator
> and
> > > the
> > > > > > consumer coordinator always live on the same broker since they
> both
> > > map
> > > > > to
> > > > > > the same partition in the transaction topic. Even if there are
> > > > failures,
> > > > > > they end up on the same new broker. Hence, they share the same
> and
> > > > > > consistent view of the PIDs, epochs and App IDs, whatever it is.
> > The
> > > > > > consumer coordinator will skip over the transaction metadata
> > messages
> > > > > when
> > > > > > it bootstraps the offsets from this new topic for consumer groups
> > > that
> > > > > are
> > > > > > not involved in a transaction and don’t have a txn id associated
> > with
> > > > the
> > > > > > offset message in the transaction topic. The consumer coordinator
> > > will
> > > > > > expose only committed offsets in cases of consumer groups that
> are
> > > > > involved
> > > > > > in a txn. It will also be able to validate the
> OffsetCommitRequests
> > > > > coming
> > > > > > from a transactional producer by ensuring that it is coming from
> a
> > > > valid
> > > > > > PID, producer epoch since it uses the same view of this data
> > created
> > > by
> > > > > the
> > > > > > transaction coordinator (that lives on the same broker). And we
> > will
> > > > end
> > > > > up
> > > > > > with one internal topic, not too.
> > > > > >
> > > > > > This proposal offers better operational simplicity and fewer
> > internal
> > > > > > topics but there are some downsides that come with it — there
> are 2
> > > > types
> > > > > > of messages in one topic (txn metadata ones and offset ones).
> Since
> > > > this
> > > > > > internal topic serves a dual purpose, it will be harder to name
> it
> > > and
> > > > > also
> > > > > > design a message format that includes the different types of
> > messages
> > > > > that
> > > > > > will live in the topic. Though the transaction topic already
> needs
> > to
> > > > > write
> > > > > > 5 different types of messages (the AppID->PID mapping, the
> BeginTxn
> > > > > > message, InsertTxn, PrepareCommit, Committed/Aborted) so maybe
> > adding
> > > > the
> > > > > > offset message isn't a big deal?
> > > > > >
> > > > > > Back when we introduced the offsets topic, we had discussed
> making
> > it
> > > > > more
> > > > > > general and allowing the producer to send offset commit messages
> to
> > > it
> > > > > but
> > > > > > ended up creating a specialized topic to allow the consumer
> > > coordinator
> > > > > to
> > > > > > wall off and prevent unauthorized writes from consumers outside
> of
> > a
> > > > > group.
> > > > > > Jason can comment on the specifics but I don't believe that goal
> of
> > > the
> > > > > new
> > > > > > consumer protocol was quite achieved.
> > > > > >
> > > > > > I have other comments on the message format, request names etc
> but
> > > > wanted
> > > > > > to get your thoughts on these 2 issues first :-)
> > > > > >
> > > > > > On Wed, Nov 30, 2016 at 2:19 PM Guozhang Wang <
> wangg...@gmail.com>
> > > > > wrote:
> > > > > >
> > > > > > > Hi all,
> > > > > > >
> > > > > > > I have just created KIP-98 to enhance Kafka with exactly once
> > > > delivery
> > > > > > > semantics:
> > > > > > >
> > > > > > > *
> > > > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> > > > > > 98+-+Exactly+Once+Delivery+and+Transactional+Messaging
> > > > > > > <
> > > > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> > > > > > 98+-+Exactly+Once+Delivery+and+Transactional+Messaging
> > > > > > > >*
> > > > > > >
> > > > > > > This KIP adds a transactional messaging mechanism along with an
> > > > > > idempotent
> > > > > > > producer implementation to make sure that 1) duplicated
> messages
> > > sent
> > > > > > from
> > > > > > > the same identified producer can be detected on the broker
> side,
> > > and
> > > > > 2) a
> > > > > > > group of messages sent within a transaction will atomically be
> > > either
> > > > > > > reflected and fetchable to consumers or not as a whole.
> > > > > > >
> > > > > > > The above wiki page provides a high-level view of the proposed
> > > > changes
> > > > > as
> > > > > > > well as summarized guarantees. Initial draft of the detailed
> > > > > > implementation
> > > > > > > design is described in this Google doc:
> > > > > > >
> > > > > > > https://docs.google.com/document/d/11Jqy_
> > > > > GjUGtdXJK94XGsEIK7CP1SnQGdp2eF
> > > > > > > 0wSw9ra8
> > > > > > > <https://docs.google.com/document/d/11Jqy_
> > > > > GjUGtdXJK94XGsEIK7CP1SnQGdp2eF
> > > > > > 0wSw9ra8>
> > > > > > >
> > > > > > >
> > > > > > > We would love to hear your comments and suggestions.
> > > > > > >
> > > > > > > Thanks,
> > > > > > >
> > > > > > > -- Guozhang
> > > > > > >
> > > > > > --
> > > > > > Thanks,
> > > > > > Neha
> > > > > >
> > > > >
> > > >
> > >
> >
>

Reply via email to