@Jay

1. Stream's applicationId is shared among all instances for the app, and is
used as part of the consumer group id, while "app.id" is per producer
instance. So a Streams app that has a single "applicationID" config will
likely contain multiple producers each with a different appID based on
their corresponding taskIDs.

2. Another motivation besides the one pointed out by Jason for making sure
transaction-involved offsets have been committed before resuming, is that
we also want to separate the "app.id" config with the transactional
mechanism. More concretely, if a user does specify the "app.id" config and
without using transaction functions (i.e. initTransactions, beginTxn, etc),
they can still get idempotency guarantee across multiple sessions of the
producer identified by the app.id.

4. We thought about the PID length, note that since we do not expire PIDs,
we are expecting it to cover all possible producers that we have ever seen,
and producers without an "app.id" can come and go with different PIDs. That
is why we feel 4 billion may not be sufficient.



Guozhang


On Thu, Dec 1, 2016 at 11:10 PM, Jason Gustafson <ja...@confluent.io> wrote:

> Hey Jay,
>
> Thanks for the questions! Let me take a couple of them.
>
> 2. The initTransactions() call is a little annoying. Can we get rid of
> >    that and call it automatically if you set a transaction.app.id when
> we
> >    do the first message send as we do with metadata? Arguably we should
> > have
> >    included a general connect() or init() call in the producer, but given
> > that
> >    we didn't do this it seems weird that the cluster metadata initializes
> >    automatically on demand and the transaction metadata doesn't.
>
>
> The purpose of this call is to fence off any producer with the same AppID
> and await the completion of any pending transactions. When it returns, you
> know that your producer is safe to resume work. Take the the "consume and
> produce" use case as an example. We send the offset commits as part of the
> producer's transaction (approximating the idea that it is "just another
> write to a partition"). When you first initialize the application, you have
> to determine when it's safe for the consumer to read those offsets.
> Otherwise, you may read stale offsets before a transaction which is rolling
> forward is able to write the marker to __consumer_offsets. So we can't do
> the initialization in send() because that would assume that we had already
> read data from the consumer, which we can't do until we've initialized the
> producer. Does that make sense?
>
> (For what it's worth, we're not married to this name or any of the others,
> so anyone can feel free to suggest alternatives.)
>
>
> 5. One implication of factoring out the message set seems to be you
> >    can't ever "repack" messages to improve compression beyond what is
> done
> > by
> >    the producer. We'd talked about doing this either by buffering when
> > writing
> >    or during log cleaning. This isn't a show stopper but I think one
> >    implication is that we won't be able to do this. Furthermore with log
> >    cleaning you'd assume that over time ALL messages would collapse down
> > to a
> >    single wrapper as compaction removes the others.
>
>
> Yeah, that's a fair point. You may still be able to do some merging if
> adjacent message sets have the same PID, but the potential savings might
> not be worth the cost of implementation. My gut feeling is that merging
> message sets from different producers may not be a great idea anyway (you'd
> have to accept the fact that you always need "deep iteration" to find the
> PIDs contained within the message set), but I haven't thought a ton about
> it. Ultimately we'll have to decide if the potential for savings in the
> future is worth some loss in efficiency now (for what it's worth, I think
> the work that Ben has been looking at also hopes to bundle some more
> information into the message set header).
>
> On a purely pragmatic development level, after spending a ton of recent
> time working with that code, I can say that the benefit of having a
> conceptually simpler message format is huge. It allows you to converge the
> paths for validation of message sets on the broker, for example. Currently,
> we pretty much need two separate paths everywhere we process messages. It
> can be tricky just to tell if the message you're dealing with is the inner
> or outer message, and whether it matters or not. Also, the fact that the
> inner and outer messages share common fields makes bugs like KAFKA-4298
> <https://issues.apache.org/jira/browse/KAFKA-4298> possible. The risk of
> these bugs is much smaller when you can clearly separate the fields allowed
> in the message set header and those in the messages.
>
>
> Thanks,
> Jason
>
> On Thu, Dec 1, 2016 at 8:19 PM, Jay Kreps <j...@confluent.io> wrote:
>
> > Looks great!
> >
> > A few questions:
> >
> >    1. What is the relationship between transaction.app.id and the
> existing
> >    config application.id in streams?
> >    2. The initTransactions() call is a little annoying. Can we get rid of
> >    that and call it automatically if you set a transaction.app.id when
> we
> >    do the first message send as we do with metadata? Arguably we should
> > have
> >    included a general connect() or init() call in the producer, but given
> > that
> >    we didn't do this it seems weird that the cluster metadata initializes
> >    automatically on demand and the transaction metadata doesn't.
> >    3. The equivalent concept of what we call "fetch.mode" in databases is
> >    called "isolation level" and takes values like "serializable", "read
> >    committed", "read uncommitted". Since we went with transaction as the
> > name
> >    for the thing in between the begin/commit might make sense to use this
> >    terminology for the concept and levels? I think the behavior we are
> >    planning is "read committed" and the alternative re-ordering behavior
> is
> >    equivalent to "serializable"?
> >    4. Can the PID be made 4 bytes if we handle roll-over gracefully? 2
> >    billion concurrent producers should be enough for anyone, right?
> >    5. One implication of factoring out the message set seems to be you
> >    can't ever "repack" messages to improve compression beyond what is
> done
> > by
> >    the producer. We'd talked about doing this either by buffering when
> > writing
> >    or during log cleaning. This isn't a show stopper but I think one
> >    implication is that we won't be able to do this. Furthermore with log
> >    cleaning you'd assume that over time ALL messages would collapse down
> > to a
> >    single wrapper as compaction removes the others.
> >
> > -Jay
> >
> > On Wed, Nov 30, 2016 at 2:19 PM, Guozhang Wang <wangg...@gmail.com>
> wrote:
> >
> > > Hi all,
> > >
> > > I have just created KIP-98 to enhance Kafka with exactly once delivery
> > > semantics:
> > >
> > > *https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> > > 98+-+Exactly+Once+Delivery+and+Transactional+Messaging
> > > <https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> > > 98+-+Exactly+Once+Delivery+and+Transactional+Messaging>*
> > >
> > > This KIP adds a transactional messaging mechanism along with an
> > idempotent
> > > producer implementation to make sure that 1) duplicated messages sent
> > from
> > > the same identified producer can be detected on the broker side, and
> 2) a
> > > group of messages sent within a transaction will atomically be either
> > > reflected and fetchable to consumers or not as a whole.
> > >
> > > The above wiki page provides a high-level view of the proposed changes
> as
> > > well as summarized guarantees. Initial draft of the detailed
> > implementation
> > > design is described in this Google doc:
> > >
> > > https://docs.google.com/document/d/11Jqy_
> GjUGtdXJK94XGsEIK7CP1SnQGdp2eF
> > > 0wSw9ra8
> > >
> > >
> > > We would love to hear your comments and suggestions.
> > >
> > > Thanks,
> > >
> > > -- Guozhang
> > >
> >
>



-- 
-- Guozhang

Reply via email to