Hi Apurva,

Thanks for the reply. When I was thinking of exactly once I am thinking of
"exactly once with availability", Users probably wouldn't want to sacrifice
availability for exactly once. To achieve exactly once with same
availability and acks=all, users actually need to pay more cost. To
tolerate one broker failure, one has to set replication.factor to at least
3 and min.isr to at least 2. Do you mean we should also set those to
default value? Would it be a little weird because redundancy level is a
pretty customized decision so there is no one single correct default
configuration for that.

The concern I have is that acks=-1 is not only associated with exactly once
semantic. I am not sure if the side effect it brings justifies a default
config, such as performance, cost, etc.

>From users' perspective, when idempotence=true and
max.in.flight.requests.per.connection > 0, ideally what acks=1 should
really mean is that "as long as there is no hardware failure, my message is
sent exactly once". Do you think this semantic is good enough as a default
configuration to ship? It is unfortunate this statement is not true today
as even when we do leader migration without any broker failure, the leader
will naively truncate the data that has not been replicated. It is a long
existing issue and we should try to fix that.

For the max.in.flight.requests.per.connection, can you elaborate a little
on "Given the nature of the idempotence feature, we have to bound it.".
What is the concern here? It seems that when nothing wrong happens,
pipelining should just work. And the memory is bounded by the memory buffer
pool anyways. Sure one has to resend all the subsequent batches if one
batch is out of sequence, but that should be rare and we probably should
not optimize for that.

Thanks,

Jiangjie (Becket) Qin

On Fri, Aug 11, 2017 at 2:08 PM, Apurva Mehta <apu...@confluent.io> wrote:

> Thanks for your email Becket. I would be interested in hearing others
> opinions on which should be a better default between acks=1 and acks=all.
>
> One important point on which I disagree is your statement that 'users need
> to do a lot of work to get exactly-once with acks=all'. This is debatable.
> If we enable acks=all,  and if we ship with sane topic-level configs (like
> disabling unclean leader election), then users will get produce exceptions
> with the default settings only for authorization and config exceptions, or
> exceptions due to correlated hard failures or software bugs (assuming
> replication-factor > 1, which is when acks=all and acks=1 differ). This
> should be sufficiently rare that expecting apps to shut down and have
> manual intervention to ensure data consistency is not unreasonable.
>
> So users will not have to have complicated code to ensure exactly-once in
> their app with my proposed defaults: just shut down the producer when a
> `send` returns an error and check manually if you really care about
> exactly-once. The latter should happen so rarely that I argue that it would
> be worth the cost. And if all else fails, there are still ways to recover
> automatically, but those are then very complex as you pointed out.
>
> Regarding max.in.flight: again, given the nature of the idempotence
> feature, we have to bound it. One trade off is that if you have this
> cross-dc use case with extremely high client/broker latency, you either
> accept lower performance with idempotence (and max.in.flight=5), or disable
> idempotence and keep max.in.flight at 20 or whatever. I think this is a
> fair tradeoff.
>
> Thanks,
> Apurva
>
>
> On Fri, Aug 11, 2017 at 11:45 AM, Becket Qin <becket....@gmail.com> wrote:
>
> > Hi Apurva,
> >
> > I agree that most changes we are talking about here are for default
> values
> > of the configurations and users can always override them. So I think the
> > question to ask is more about the out of the box experience. If the
> change
> > makes strict improvement compared with the current settings, that would
> > make a lot of sense. (e.g. idempotence + pipelined produce requests). On
> > the other hand, if the out of the box experience is not strictly
> improved,
> > but just default to address another scenario, we may need to think about
> > that a bit more (e.g. acks=all).
> >
> > The way I view this is the following: For the users who wants exactly
> once,
> > they need to do a lot of extra work even if we do all the right
> > configurations. That means for those users, they need to understand all
> the
> > failure cases and properly handle them. For those users, they probably
> > already understand (or at least needs to understand) how to configure the
> > cluster. So providing the default configurations for them do not provide
> > much additional benefit. For the other users, who care about low latency
> > and high throughput but not require the most strong semantic, shipping
> the
> > default settings to be the strong semantic at the cost of latency and
> > throughput will force them to look into the configurations and tune for
> > throughput and latency, which is something they don't need to in the
> > previous versions. Therefore, I feel it may not be necessary to ship
> Kafka
> > with the strongest guarantee.
> >
> > In terms of the max.in.flight.request. In some long latency pipeline,
> (e.g
> > cross ocean pipeline), the latency could be a couple of hundreds ms.
> > Assuming we have 10 Gbps bandwidth and 10 MB average produce request
> size.
> > When the latency is 200 ms, because each requests takes about 10 ms to
> > send, we need to have max.in.flight.requests ~ 20 in order to fully
> utilize
> > the network bandwidth. When the requests are smaller, we will need to
> > pipeline more requests.
> >
> > Thanks,
> >
> > Jiangjie (Becket) Qin
> >
> >
> >
> >
> > On Thu, Aug 10, 2017 at 10:43 PM, Apurva Mehta <apu...@confluent.io>
> > wrote:
> >
> > > Hi Dong,
> > >
> > > Thanks for your comments.
> > >
> > > Yes, with retries=MAX_INT, producer.flush() may block. I think there
> are
> > > two solutions: a good one would be to adopt some form of KIP-91 to
> bound
> > > the time a message can remain unacknowledged. Alternately, we could set
> > the
> > > default retries to 10 or something. I prefer implementing KIP-91 along
> > with
> > > this KIP to solve this problem, but it isn't a strong dependency.
> > >
> > > Yes, OutOfOrderSequence is a new exception. It indicates a previously
> > > acknowledged message was lost. This could happen even today, but there
> is
> > > no way for the client to detect it. With KIP-98 and the new sequence
> > > numbers, we can. If applications ignore it, they would have the same
> > > behavior as the already have, except with the explicit knowledge that
> > > something has been lost.
> > >
> > > Finally, from my perspective, the best the reason to make acks=all the
> > > default is that it would be a coherent default to have. Along with
> > enabling
> > > idempotence, acks=all, and retries=MAX_INT would mean that acknowledged
> > > messages would appear in the log exactly once. The 'fatal' exceptions
> > would
> > > be either AuthorizationExceptions, ConfigExceptions, or rare data loss
> > > issues due to concurrent failures or software bugs. So while this is
> not
> > a
> > > guarantee of exactly once, it is practically as close to it as you can
> > get.
> > > I think this is a strong enough reason to enable acks=all.
> > >
> > > Thanks,
> > > Apurva
> > >
> > >
> > > On Thu, Aug 10, 2017 at 1:04 AM, Dong Lin <lindon...@gmail.com> wrote:
> > >
> > > > Hey Apurva,
> > > >
> > > > Thanks for the KIP. I have read through the KIP and the prior
> > discussion
> > > in
> > > > this thread. I have three concerns that are related to Becket's
> > comments:
> > > >
> > > > - Is it true that, as Becket has mentioned, producer.flush() may
> block
> > > > infinitely if retries=MAX_INT? This seems like a possible reason to
> > break
> > > > user's application. I think we probably should avoid causing
> > correctness
> > > > penalty for application.
> > > >
> > > > - It seems that OutOfOrderSequenceException will be a new exception
> > > thrown
> > > > to user after this config change. Can you clarify whether this will
> > cause
> > > > correctness penalty for application?
> > > >
> > > > - It is not very clear to me whether the benefit of increasing acks
> > from
> > > 1
> > > > to all is worth the performance hit. For users who have not already
> > > > overridden acks to all, it is very likely that they are not already
> > doing
> > > > other complicated work (e.g. close producer in callback) that are
> > > necessary
> > > > for exactly-once delivery. Thus those users won't have exactly-once
> > > > semantics by simply picking up the change in the default acks
> > > > configuration. It seems that the only benefit of this config change
> is
> > > the
> > > > well-known tradeoff between performance and message loss rate. I am
> not
> > > > sure this is a strong reason to risk reducing existing user's
> > > performance.
> > > >
> > > > I think my point is that we should not to make change that will break
> > > > user's existing application. And we should try to avoid reducing
> user's
> > > > performance unless there is strong benefit of doing so (e.g.
> > > exactly-once).
> > > >
> > > > Thanks,
> > > > Dong
> > > >
> > > >
> > > >
> > > >
> > > > On Wed, Aug 9, 2017 at 10:43 PM, Apurva Mehta <apu...@confluent.io>
> > > wrote:
> > > >
> > > > > Thanks for your email Becket.
> > > > >
> > > > > Your observations around using acks=1 and acks=-1 are correct. Do
> > note
> > > > that
> > > > > getting an OutOfOrderSequence means that acknowledged data has been
> > > lost.
> > > > > This could be due to a weaker acks setting like acks=1 or due to a
> > > topic
> > > > > which is not configured to handle broker failures cleanly (unclean
> > > leader
> > > > > election is enabled, etc.). Either way, you are right in observing
> > that
> > > > if
> > > > > an app is very serious about having exactly one copy of each ack'd
> > > > message
> > > > > in the log, it is a significant effort to recover from this error.
> > > > >
> > > > > However, I propose an alternate way of thinking about this: is it
> > > > > worthwhile shipping Kafka with the defaults tuned for strong
> > semantics?
> > > > > That is essentially what is being proposed here, and of course
> there
> > > will
> > > > > be tradeoffs with performance and deployment costs-- you can't have
> > > your
> > > > > cake and eat it too.
> > > > >
> > > > > And if we want to ship Kafka with strong semantics by default, we
> > might
> > > > > want to make the default topic level settings as well as the client
> > > > > settings more robust. This means, for instance, disabling unclean
> > > leader
> > > > > election by default. If there are other configs we need to change
> on
> > > the
> > > > > broker side to ensure that ack'd messages are not lost due to
> > transient
> > > > > failures, we should change those as well as part of a future KIP.
> > > > >
> > > > > Personally, I think that the defaults should provide robust
> > guarantees.
> > > > >
> > > > > And this brings me to another point: these are just proposed
> > defaults.
> > > > > Nothing is being taken away in terms of flexibility to tune for
> > > different
> > > > > behavior.
> > > > >
> > > > > Finally, the way idempotence is implemented means that there needs
> to
> > > be
> > > > > some cap on max.in.flight when idempotence is enabled -- that is
> > just a
> > > > > tradeoff of the feature. Do we have any data that there are
> > > installations
> > > > > which benefit greatly for a value of max.in.flight > 5? For
> instance,
> > > > > LinkedIn probably has the largest and most demanding deployment of
> > > Kafka.
> > > > > Are there any applications which use max.inflight > 5? That would
> be
> > > good
> > > > > data to have.
> > > > >
> > > > > Thanks,
> > > > > Apurva
> > > > >
> > > > >
> > > > >
> > > > >
> > > > >
> > > > > On Wed, Aug 9, 2017 at 2:59 PM, Becket Qin <becket....@gmail.com>
> > > wrote:
> > > > >
> > > > > > Thanks for the KIP, Apurva. It is a good time to review the
> > > > > configurations
> > > > > > to see if we can improve the user experience. We also might need
> to
> > > > think
> > > > > > from users standpoint about the out of the box experience.
> > > > > >
> > > > > > 01. Generally speaking, I think it makes sense to make
> > > idempotence=true
> > > > > so
> > > > > > we can enable producer side pipeline without ordering issue.
> > However,
> > > > the
> > > > > > impact is that users may occasionally receive
> > > > OutOfOrderSequencException.
> > > > > > In this case, there is not much user can do if they want to
> ensure
> > > > > > ordering. They basically have to close the producer in the call
> > back
> > > > and
> > > > > > resend all the records that are in the RecordAccumulator. This is
> > > very
> > > > > > involved. And the users may not have a way to retrieve the
> Records
> > in
> > > > the
> > > > > > accumulator anymore. So for the users who really want to achieve
> > the
> > > > > > exactly once semantic, there are actually still a lot of work to
> do
> > > > even
> > > > > > with those default. For the rest of the users, they need to
> handle
> > > one
> > > > > more
> > > > > > exception, which might not be a big deal.
> > > > > >
> > > > > > 02. Setting acks=-1 would significantly reduce the likelihood of
> > > > > > OutOfOrderSequenceException from happening. However, the
> > > > > latency/throughput
> > > > > > impact and additional purgatory burden on the broker are big
> > > concerns.
> > > > > And
> > > > > > it does not really guarantee exactly once without broker side
> > > > > > configurations. i.e unclean.leader.election, min.isr, etc. I am
> not
> > > > sure
> > > > > if
> > > > > > it is worth making acks=-1 a global config instead of letting the
> > > users
> > > > > who
> > > > > > are really care about this to configure correctly.
> > > > > >
> > > > > > 03. Regarding retries, I think we had some discussion in KIP-91.
> > The
> > > > > > problem of setting retries to max integer is that
> producer.flush()
> > > may
> > > > > take
> > > > > > forever. Will this KIP be depending on KIP-91?
> > > > > >
> > > > > > I am not sure about having a cap on the max.in.flight.requests.
> It
> > > > seems
> > > > > > that on some long RTT link, sending more requests in the pipeline
> > > would
> > > > > be
> > > > > > the only way to keep the latency to be close to RTT.
> > > > > >
> > > > > > Thanks,
> > > > > >
> > > > > > Jiangjie (Becket) Qin
> > > > > >
> > > > > >
> > > > > > On Wed, Aug 9, 2017 at 11:28 AM, Apurva Mehta <
> apu...@confluent.io
> > >
> > > > > wrote:
> > > > > >
> > > > > > > Thanks for the comments Ismael and Jason.
> > > > > > >
> > > > > > > Regarding the OutOfOrderSequenceException, it is more likely
> when
> > > you
> > > > > > > enable idempotence and have acks=1, simply because you have a
> > > greater
> > > > > > > probability of losing acknowledged data with acks=1, and the
> > error
> > > > code
> > > > > > > indicates that.
> > > > > > >
> > > > > > > The particular scenario is that a broker acknowledges a message
> > > with
> > > > > > > sequence N before replication happens, and then crashes. Since
> > the
> > > > > > message
> > > > > > > was acknowledged the producer increments its sequence to N+1.
> The
> > > new
> > > > > > > leader would not have received the message, and still expects
> > > > sequence
> > > > > N
> > > > > > > from the producer. When it receives N+1 for the next message,
> it
> > > will
> > > > > > > return an OutOfOrderSequenceNumber, correctl/y indicating some
> > > > > previously
> > > > > > > acknowledged messages are missing.
> > > > > > >
> > > > > > > For the idempotent producer alone, the
> > OutOfOrderSequenceException
> > > is
> > > > > > > returned in the Future and Callback, indicating to the
> > application
> > > > that
> > > > > > > some acknowledged data was lost. However, the application can
> > > > continue
> > > > > > > producing data using the producer instance. The only
> > compatibility
> > > > > issue
> > > > > > > here is that the application will now see a new exception for a
> > > state
> > > > > > which
> > > > > > > went previously undetected.
> > > > > > >
> > > > > > > For a transactional producer, an OutOfOrderSequenceException is
> > > fatal
> > > > > and
> > > > > > > the application must use a new instance of the producer.
> > > > > > >
> > > > > > > Another point about acks=1 with enable.idempotence=true. What
> > > > semantics
> > > > > > are
> > > > > > > we promising here? Essentially we are saying that the default
> > mode
> > > > > would
> > > > > > be
> > > > > > > 'if a message is in the log, it will occur only once, but all
> > > > > > acknowledged
> > > > > > > messages may not make it to the log'. I don't think that this
> is
> > a
> > > > > > > desirable default guarantee.
> > > > > > >
> > > > > > > I will update the KIP to indicate that with the new default,
> > > > > applications
> > > > > > > might get a new 'OutOfOrderSequenceException'.
> > > > > > >
> > > > > > > Thanks,
> > > > > > > Apurva
> > > > > > >
> > > > > > > On Wed, Aug 9, 2017 at 9:33 AM, Ismael Juma <ism...@juma.me.uk
> >
> > > > wrote:
> > > > > > >
> > > > > > > > Hi Jason,
> > > > > > > >
> > > > > > > > Thanks for the correction. See inline.
> > > > > > > >
> > > > > > > > On Wed, Aug 9, 2017 at 5:13 PM, Jason Gustafson <
> > > > ja...@confluent.io>
> > > > > > > > wrote:
> > > > > > > >
> > > > > > > > > Minor correction: the OutOfOrderSequenceException is not
> > fatal
> > > > for
> > > > > > the
> > > > > > > > > idempotent producer and it is not necessarily tied to the
> > acks
> > > > > > setting
> > > > > > > > > (though it is more likely to be thrown with acks=1).
> > > > > > > >
> > > > > > > >
> > > > > > > > Right, it would be worth expanding on the specifics of this.
> My
> > > > > > > > understanding is that common failure scenarios could trigger
> > it.
> > > > > > > >
> > > > > > > >
> > > > > > > > > It is used to signal
> > > > > > > > > the user that there was a gap in the delivery of messages.
> > You
> > > > can
> > > > > > hit
> > > > > > > > this
> > > > > > > > > if there is a pause on the producer and the topic retention
> > > kicks
> > > > > in
> > > > > > > and
> > > > > > > > > deletes the last records the producer had written. However,
> > it
> > > is
> > > > > > > > possible
> > > > > > > > > for the user to catch it and simply keep producing
> > (internally
> > > > the
> > > > > > > > producer
> > > > > > > > > will generate a new ProducerId).
> > > > > > > >
> > > > > > > >
> > > > > > > > I see, our documentation states that it's fatal in the
> > following
> > > > > > example
> > > > > > > > and in the `send` method. I had overlooked that this was
> > > mentioned
> > > > in
> > > > > > the
> > > > > > > > context of transactions. If we were to enable idempotence by
> > > > default,
> > > > > > > we'd
> > > > > > > > want to flesh out the docs for idempotence without
> > transactions.
> > > > > > > >
> > > > > > > > * try {
> > > > > > > > *     producer.beginTransaction();
> > > > > > > > *     for (int i = 0; i < 100; i++)
> > > > > > > > *         producer.send(new ProducerRecord<>("my-topic",
> > > > > > > > Integer.toString(i), Integer.toString(i)));
> > > > > > > > *     producer.commitTransaction();
> > > > > > > > * } catch (ProducerFencedException |
> > OutOfOrderSequenceException
> > > |
> > > > > > > > AuthorizationException e) {
> > > > > > > > *     // We can't recover from these exceptions, so our only
> > > option
> > > > > is
> > > > > > > > to close the producer and exit.
> > > > > > > > *     producer.close();
> > > > > > > > * } catch (KafkaException e) {
> > > > > > > > *     // For all other exceptions, just abort the transaction
> > and
> > > > try
> > > > > > > > again.
> > > > > > > > *     producer.abortTransaction();
> > > > > > > > * }
> > > > > > > > * producer.close();
> > > > > > > >
> > > > > > > > Nevertheless, pre-idempotent-producer code
> > > > > > > > > won't be expecting this exception, and that may cause it to
> > > break
> > > > > in
> > > > > > > > cases
> > > > > > > > > where it previously wouldn't. This is probably the biggest
> > risk
> > > > of
> > > > > > the
> > > > > > > > > change.
> > > > > > > > >
> > > > > > > >
> > > > > > > > This is a good point and we should include it in the KIP.
> > > > > > > >
> > > > > > > > Ismael
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
>

Reply via email to