Thanks Greg,

Apologize if we gave the impression that we did not try to address your concerns. It was not my intention to just minimize them. Of course, I just don't see it your way, but that's ok. We don't have to agree. That's why we have a DISCUSS thread to begin with.

I don't know the producer internal well enough to judge, how easy/difficult it would be to implement a `prepare(..)` method as proposed. If it works, I won't object to go down this route. Happy to disagree and commit.


I just don't see why `prepare()` is semantically cleaner/better as you say, compared to a `send()` which would throw an exception directly? But this might be personal preference / judgment, and it might not benefit this KIP to argue about it further.


I am wondering about interleaved calls of `prepare()` and regular `send()` though, especially given that the producer is thread-safe. Maybe there is nothing to worry about (as said, I don't know the producer internal well enough), but if this would cause issues, it might not be the best way forward.


In the end it's Alieh's KIP, and it seems adding `prepare(...)` will enlarge to scope of the KIP. So it's her call if she wants to go down this path or not.


-Matthias


On 7/22/24 12:30 PM, Greg Harris wrote:
Hi Alieh,

Yes, I think you understand my intent for the prepare() method.

Thanks,
Greg

On Mon, Jul 22, 2024 at 2:54 AM Alieh Saeedi <asae...@confluent.io.invalid>
wrote:

Hi Greg,


I appreciate your concerns and comprehensive answer.


I am not sure whether I fully understood what you meant or not. You mean,
at the end, the user can go for one of the following scenarios: Either

1) `beginTxn()` and `send(record)` and `commitTxn()`  or

2) `beginTxn()` and `prepare(record)` and `send(prepared_record)` and
`commitTxn()` ?


Of course, the `send` in scenario 1 is different from the one in scenario
2, since a part of the second one 's job has been done during
`prepare()`ing.


Cheers,

Alieh

On Sat, Jul 20, 2024 at 1:20 AM Greg Harris <greg.har...@aiven.io.invalid>
wrote:

Hi Artem and Matthias,

On the other hand, the effort to prove that
keeping all records in memory won't break some scenarios (and generally
breaking one is enough to cause a lot of pain) seems to be
significantly
higher than to prove that setting some flag in some API has pretty
much 0
chance of regression

in the end, why buffer records twice?

This way we don't
ignore the error, we're just changing the method they are delivered.

Very clean semantics
which should also address the concern of "non-atomic tx"

I feel like my concerns are being minimized instead of being addressed
in this discussion, and if that's because I'm not expressing them
clearly,
I apologize.

Many users come to Kafka with prior expectations, especially when we use
industry-standard terminology like 'Exactly Once Semantics",
"Transactions", "Commit", "Abort". Of course Kafka isn't an
ACID-compliant
database, but users will evaluate, discuss, and develop applications with
Kafka through the lens of the ACID principles, because that is the
framework most commonly applied to transactional semantics.
The original design of KIP-98 [1] explicitly mentions atomic commits
(with
the same meaning as the A in ACID) as the primary abstraction being added
(reproduced here):

At the core, transactional guarantees enable applications to produce to
multiple TopicPartitions atomically, ie. all writes to these
TopicPartitions will succeed or fail as a unit.
Further, since consumer progress is recorded as a write to the offsets
topic, the above capability is leveraged to enable applications to batch
consumed and produced messages into a single atomic unit, ie. a set of
messages may be considered consumed only if the entire
‘consume-transform-produce’ executed in its entirety.

I think it's important to say that to a user, "writes" really means
"send()
and commitOffsets() calls", not literal produce requests to Kafka
brokers,
and "consume-transform-produce" really means "poll(), transform, send()".
This is because to a user, the implementation within poll() and send()
and
the broker are none of their concern, and are intended to be within the
abstraction.
When I say that this feature is a non-atomic commit, I mean that this
feature does not fit the above description, and breaks the transaction
abstraction in a meaningful way. No longer do all writes succeed or fail
as
a unit, some failures are permitted to drop data. No longer must a
consume-transform-produce cycle be executed in its entirety, some parts
may
be left incomplete.
This means that this method will be difficult to define ("which
exceptions
are covered?"), difficult to document ("how do we explain
'not-really-atomic commits' clearly and unambiguously to a potential
user?"), and difficult to compose ("if someone turns this option on, how
does that affect delivery guarantees and opportunities for bugs in
upper layers?").
Users currently rely heavily on analogies to other database systems to
make
sense of Kafka's transactions, and we need to use that to our benefit,
rather than designing in spite of it being true.

However this atomicity guarantee isn't always desirable, as evidenced by
the original bug report [2]. If you're interacting with a website form
for
example, and a database transaction fails because one of your strings is
oversize, you don't need to re-input all of your form responses from
scratch, as there is an application layer/browser in-between to preserve
the state and retry the transaction.
And while you could make a convenience/performance/etc argument in that
situation ("The database should truncate/null-out the oversize string")
and
modern databases often have very expressive DML that would permit such a
behavior (try-catch, etc), the End-to-End arguments [3] make me believe
that is a bad design and should be discouraged.
To that end, I was suggesting ways to push this farther and farther up
the
stack, such as performing record size estimation. This doesn't mean that
it
can't be added at a low level of abstraction, just that we need to make
sure to exhaust all other alternatives, and justify it with a performance
benefit.

I was holding off on discussing the literal design until you provided
concrete performance justification, but to progress the discussion while
i'm waiting for that, I can give my thoughts:

I don't think an overloaded send() method is appropriate given that this
appears to be a niche use-case, and the send() method itself is probably
the single most important method in the Clients library. The KIP-98
design
was a much more substantial change to the Producer than this KIP, and it
found a way to preserve the original type signature (but added an
exception).
Users picking up the Producer for the first time may see this additional
method, and may spend time trying to understand whether it is something
suitable for their use-case. In the best case, they ignore it and use the
other two signatures. But it is also possible that they will use it
without
understanding it, and have unexpected data loss. Overall, this feels
like a
net negative to the producer user-base.
Also boolean, enum, vararg, flags, etc don't follow the current trend of
the AdminClient passing Options DTOs for modifying individual API calls,
which is a much more extensible design.

If there was a way of preparing a record before calling send() that did
some or all of the pre-send validation (serialization, size checking,
maybe
topic-partition existence existence, authorization, etc) that could be a
reasonable way to emit these sorts of errors in a way that makes it clear
that sending hasn't started and the record is not part of the transaction
yet. I'm imagining something like:

```
public abstract class PreparedRecord<K, V> extends ProducerRecord<K, V>
{ }
// Actual instantiated class and methods could be an implementation
detail.
interface Producer {
     Optional<PreparedRecord<K, V>> prepare(ProducerRecord<K, V> record,
PrepareOptions options) throws SerializationException,
RecordTooLargeException;
}

// Application code:
Optional<PreparedRecord<byte[], byte[]>> prepared = Optional.empty();
try {
      prepared = producer.prepare(record, null);
} catch (Exception e) {
      if (critical(e)) {
         producer.abortTransaction();
         return;
     }
}
if (prepared.isPresent()) {
     producer.send(prepared.get()); // errors will be propagated via
commitTransaction() and also abort the transaction.
}
```

Semantically then, it would make sense that the data which has been
"prepared to send" but has not been "sent" is intentionally left out of
the
batch by the application control-flow, not an option passed into the
producer changing the control-flow within the Producer. If passed a
prepared record, the send method can then be responsible only for waiting
for sufficient buffer capacity, reusing the work done by the prepare()
method.
This prepare method could also enable other use-cases, like parallel
serialization, exception-less handling, or more advanced
buffering/balking
behavior ("refuse to prepare records that can't be sent without
blocking").
And because the synchronous processing is made explicit, it's much easier
to communicate to users which exceptions are covered and can be prevented
from causing transaction aborts.
If someone uses the method, or doesn't use it, they aren't put at risk of
compromising their delivery guarantees unexpectedly.

Thanks,
Greg

[1]


https://cwiki.apache.org/confluence/display/KAFKA/KIP-98+-+Exactly+Once+Delivery+and+Transactional+Messaging
[2] https://issues.apache.org/jira/browse/KAFKA-15259
[3] https://web.mit.edu/Saltzer/www/publications/endtoend/endtoend.pdf

On Fri, Jul 19, 2024 at 12:39 PM Matthias J. Sax <mj...@apache.org>
wrote:

For catching client side errors this would work IMHO. I am ok with
this.

We throw before we add the record to the batch. Very clean semantics
which should also address the concern of "non-atomic tx"... The
exception clearly indicates that the record was not added to the TX,
and
users can react to it accordingly.

We did discuss this idea previously, but did not have a good proposal
to
make it backward compatible. The newly proposed overload would address
this issue of backward compatibility.

Of course, it might not make it easily extensible in the future for
broker side errors, but it's unclear anyway right now, if we would even
get to a solution for broker side errors or not -- so maybe it's ok to
accept this and drop/ignore the broker side error question for now.



A small follow up thought/question: instead of using a boolean, would
we
actually want to make it a var-arg enum to allow users to enable this
for certain errors explicitly and individually? Beside the added
flexibility and fine grain control, a var-arg enum would also make the
API nicer/cleaner IMHO compare to a boolean.

For convenience, this enum could have an additional `ALL` option (and
we
would call out that if `ALL` is used, new error types might be added in
future release making the code less safe/robust -- ie, use at your own
risk only)

This way, we also explicitly document what exception might be thrown in
the KIP, as we would add an enum for each error type explicitly, and
also make if future proof for new error types we want to cover -- each
addition would require a KIP to extend the enum.



-Matthias


On 7/18/24 10:33 PM, Artem Livshits wrote:
Hey folks,

Hopefully not to make this KIP go for another spin :-), but I thought
of
a
modification that might actually address safety concerns over using
flags
to ignore a vaguely specified class of errors.

What if we had the following overload of .send method:

    void send(ProducerRecord record, Callback callback, boolean
throwImmediately)

and if throwImmediately=false, then we behave the same way as now
(return
errors via Future and poison transaction) and if
throwImmediately=true
then
we just throw errors immediately from the send function.  This way we
don't
ignore the error, we're just changing the method they are delivered.
Then
KStreams can catch the error for send(record, callback, true) and do
whatever it needs to do.

-Artem


On Mon, Jul 15, 2024 at 4:30 PM Greg Harris
<greg.har...@aiven.io.invalid

wrote:

Matthias,

Thank you for rejecting my suggested alternatives. Your responses
are
the
sorts of things I expected to see summarized in the text of the KIP.

I agree with most of your rejections, except this one:

"Estimation" is not sufficient, but we would need to know it
exactly.
And that's an impl detail, given that the message format could
change
and we could add new internal fields increasing the message size.

An estimate is certainly going to have an error. But an estimate
shouldn't
be treated as exact anyway, there should be an error bound, or
"safety
factor" used when interpreting it. For example, if the broker side
limit is
1MB, and an estimate could be wrong by ~10%, then computing an
estimate
and
dropping records >900kb should be sufficient to prevent RTLEs.
This is the sort of estimation that I would expect application
developers
could implement, without knowing the exact serialization and
protocol
overhead. This could prevent user-originated oversize records from
making
it to the producer.

Thanks,
Greg


On Mon, Jul 15, 2024 at 4:08 PM Matthias J. Sax <mj...@apache.org>
wrote:

I agree with Alieh and Artem -- in the end, why buffer records
twice?
We
effectively want to allow to push some error handling (which I btw
consider "business logic") into the producer. IMHO, there is
nothing
wrong with it. Dropping a poison pill record is no really a
violation
of
atomicity from my POV, but a business logic decision to not
include a
record in a transaction -- the proposed API just makes it much
simpler
to achieve this business logic goal.



For memory size estimation, throughput or message size is actually
not
relevant, right? We would need to look at producer buffer size, ie,
`batch.size`, `max.in.flight.request.per.connection` and
guesstimate
the
number of connections there might be? At least for KS, we don't
need
to
buffer everything until commit, but only until we get a successful
"ack"
back.

Note that KS application not only need to write to (a single) user
result topic, but multiple output topics, as well as repartition
and
changelog topics, across all tasks assigned to a thread (ie,
producer),
which can easily be 10 tasks or more. If we assume topics with 30
partitions (topics with 50 or more partitions are not uncommon
either),
and a producer who must write to 10 different topics, the number of
required connections is very quickly very high, and thus the
required
"application buffer space" would be significant.



Your others ideas seems not to be viable alternatives:

Streams users that specifically want to drop oversize records can
estimate the size of their data and drop records which are too
large, enforcing their own limits that are lower than the Kafka
limits.

"Estimation" is not sufficient, but we would need to know it
exactly.
And that's an impl detail, given that the message format could
change
and we could add new internal fields increasing the message size.
The
idea to add some `producer.serializedRecordSize()` helper method
was
discussed, but it's a very ugly API and clumsy to use -- also, the
user
code would need to know the producer config which it might not have
access to (as it might get passed in from some config file; and it
might
also be changed).

Some other alternative we also discussed was, to let `send()` throw
an
exception for a "record too large" case directly. However, this
solution
raises backward compatibly concerns, and it might also not help us
to
extend the solution in the future (eg, tackle broker side errors).
So
we
discarded this idea.



Streams users that want CONTINUE semantics can use at_least_once
semantics

Not really. EOS is mainly about not having duplicates in the
result,
but
at-least-once cannot provide this guarantee. (Even if I repeat my
self:
but dropping a poison pill record based on a business logic
decision
is
not data loss, but effectively a business logic filter...)



Streams itself can store record hashes/coordinates and fast rewind
to
the end of the last transaction, recomputing data rather than
storing
it.

Given the very complex nature of topologies, with joins,
aggregations,
flatmaps etc, this is a 100x more complex solution and not viable
in
practice.



Streams can define exactly_once + CONTINUE semantics to permit the
whole
transaction to be dropped, because it would allow the next batch
to
be
started processing.

Would this not be much worse? I have a single poison pill record
and
would need to drop a full tx (this could be tens of thousands of
records...). Also, given that KS write into changelog topic in the
same
TX, this could break the whole application.



Streams can emit records with both a transactional and
non-transactional
producer if some records are not critical-path

We (1) already have a "too many connections" problem with KS so
using
move clients is something we try to avoid (and we actually hope to
reduce the number of client and connection mid to long term), (2)
this
would be very hard to express at the API level to the user, and (3)
it
would provide very weird semantics.



they should optimize for smaller transactions,

IMHO, this would not work in practice because transaction have a
high
overhead and commit-interval is used to tradeoff throughput vs
end-to-end latency. Given certain throughput requirement, it would
not
be possible to just use a lower commit interval to reduce memory
requirements.



-Matthias




On 7/15/24 2:25 PM, Artem Livshits wrote:
Hi Greg,

This makes me think that this IGNORE_SEND_ERRORS covers an
arbitrary
set
of error conditions that may be expanded in the future, possibly
to
cover
the broker side RecordTooLargeException.

I don't think it contradicts what I said (the keyword here is "in
the
future") -- with the current functionality, the correct way to
handle
RTLE
is by only letting the client ignore client-originated RTLE (this
can
be
easily implemented on the client side).  In the future, we can
improve
on
that by making the broker return a different exception for
batch-too-large
case, then the producer would be able to return broker side
exceptions
as
well (and if the application chooses to ignore it -- it will be
able
to,
but it would be an explicit choice rather than ignoring it by
mistake),
in
this case the producer client would encapsulate backward
compatibility
logic when it connects to older brokers to make sure the the
application
doesn't accidentally gets RTLE originated by the old broker.  This
functionality is obviously more involved and we'll need to see if
going
all
the way is justified, but the partial client-only solution doesn't
close
the door.

So one way to look at the current situation is the following:

1. We can do a low effort partial solution to solve a real
existing
problem.  We can easily prove that it would do exactly what it
needs
to
do
with minimal risk of regression.
2. We have a path to a more comprehensive solution, so if we
justify
the
effort required for that, we can get there.

BTW, as a side note (I think a saw a question in the thread), we
do
try
to
introduce error categories here





https://cwiki.apache.org/confluence/display/KAFKA/KIP-1050%3A+Consistent+error+handling+for+Transactions
so eventually we may have a better classification for the errors.

"if a streams producer is producing 1MB/s, and the commit
interval
is
1
hour, I expect 3600MB of additional heap needed ...

Agree, that would be ideal.  On the other hand, the effort to
prove
that
keeping all records in memory won't break some scenarios (and
generally
breaking one is enough to cause a lot of pain) seems to be
significantly
higher than to prove that setting some flag in some API has pretty
much 0
chance of regression (we basically have a flag to say "unfix
KAFKA-9279"
so
we're getting to fairly "known good" state).  I'll let KStream
folks
comment on this one (and we still need to solve the problem of
accidental
handling of RTLE originated from broker, so some KIP would be
required
to
somehow help to differentiate those).

-Artem

On Mon, Jul 15, 2024 at 1:31 PM Greg Harris
<greg.har...@aiven.io.invalid

wrote:

Hi Artem,

Thank you for clarifying as I'm joining the conversation late and
may
have
some misconceptions.

Because of this, a more "complete" solution that
allows ignoring RecordTooLargeException regardless of its origin
is
actually incorrect, while a "partial" solution that allows
ignoring
RecordTooLargeException only originating in client code
accomplishes
the
required functionality.

This is not how I understood this feature. Above Matthias said
the
following:

We can do
follow up KIP for other errors on an on-demand basis and
fix-forward
/
enlarge the scope successively.

This makes me think that this IGNORE_SEND_ERRORS covers an
arbitrary
set of
error conditions that may be expanded in the future, possibly to
cover
the
broker side RecordTooLargeException.

Obviously, we could solve this problem by changing logic in the
broker to return a different error when the batch is too large,
but
right
now this is not the case

If the broker/wire protocol isn't ready for these errors to be
propagated,
then I don't think we're ready to add this API. It's going to be
under-generalized, and there's a decent chance that we're going
to
regret
the design choices in the future. And users that expect it to be
fully
generalized are going to be disappointed when they don't read the
fine
print and still get faulted by non-covered errors.

AL2.  In a high performance system, "just an optimization" can
be
a
functional requirement ...
    I just wanted to make the point that we shouldn't necessarily
dismiss
API changes that allow for optimizations.

My earlier statement didn't dismiss this feature as "just an
optimization",
actually the opposite. I said that performance could be a
justification,
but only if it is quantified and stated explicitly. We shouldn't
be
voting
on hand-wavy optimizations, we should be voting on things that
are
quantifiable.
For example an analysis like the following would facilitate
further
discussion: "if a streams producer is producing 1MB/s, and the
commit
interval is 1 hour, I expect 3600MB of additional heap needed per
producer". We can then discuss whether we expect higher or lower
throughput, commit intervals, or heap usage to determine what the
operating
envelope of this feature could be.
If there are a substantial number of users that have high
throughput,
long
commit intervals, _and_ RTLEs, then this feature could make
sense.
If
not,
then the downsides of this feature (complication of the API,
under-specification of the error coverage, etc) look unjustified.
In
fact,
if the number of users regularly encountering RTLEs is
sufficiently
small,
I would strongly advocate for an application-specific workaround
instead of
trying to fix this in Streams, or make memory buffering an
optional
feature
in streams.

Thanks,
Greg

On Mon, Jul 15, 2024 at 1:29 PM Greg Harris <
greg.har...@aiven.io>
wrote:

Hi Alieh,

Thanks for your response.

what does a user do
after a transaction is failed due to a `too-large-record
`exception?
They
will submit the same batch without the problematic record
again.

If they re-submit the same record, they are indicating that this
record
is
an integral part of the transaction, and the transaction should
only
be
committed with it present. If the record isn't integral to the
transaction,
they shouldn't submit it as part of the transaction.

Regarding your solution to solve the issue application-side:  I
am
a
bit hesitant to keep all sent records in memory since I think
buffering
records twice (both in Streams and Producer) would not be an
efficient
solution.

I understand your hesitation, and this touches on the
"performance"
caveat
of the end-to-end arguments in system design. There are no
perfect
designs,
and some API cleanliness may be sacrificed in favor of more
performant
solutions. You would need to make a concrete and convincing
argument
that
the performance of this solution would be better than every
alternative.
To
that end, I would recommend that you add more to the "Rejected
Alternatives" section, as that is going to carry this proposal.
Some alternatives that I can think of, but which aren't
necessarily
better:
1. Streams users that specifically want to drop oversize records
can
estimate the size of their data and drop records which are too
large, enforcing their own limits that are lower than the Kafka
limits.
2. Streams users that want CONTINUE semantics can use
at_least_once
semantics
3. Streams itself can store record hashes/coordinates and fast
rewind
to
the end of the last transaction, recomputing data rather than
storing
it.
4. Streams can define exactly_once + CONTINUE semantics to
permit
the
whole transaction to be dropped, because it would allow the next
batch
to
be started processing.
5. Streams can emit records with both a transactional and
non-transactional producer if some records are not critical-path

To generalize this point: Suppose an application tries to
minimize
storage
costs by having only one party responsible for a piece of data
at
a
time.
They initially have the data, call send(), and want to know the
earliest
time they can forget the data and transfer the responsibility to
Kafka.
With a non-transactional producer, they are responsible for the
data
until
the send() callback has succeeded. With a transactional
producer,
they
are
responsible for the data until commitTransaction() has
succeeded.
With this proposed change that makes the producer tolerate
too-large-exceptions, applications are still responsible for
storing
their
data until commitTransaction() has succeeded, because
abortTransaction()
could have also been called, or the producer could have been
fenced,
or
any
number of other failures could have occurred. This feature does
not
enable
Streams to drop responsibility earlier, it carves out a specific
situation
in which it doesn't have to rewind processing, which is a
performance
concern.

For Streams and the general case, if an application is trying to
optimize
storage costs, they should optimize for smaller transactions,
because
this
both lowers the bound on record re-delivery and lowers the
likelihood
of
a
bad record being included in any individual transaction.

Thanks,
Greg

On Mon, Jul 15, 2024 at 12:35 PM Artem Livshits
<alivsh...@confluent.io.invalid> wrote:

Hi Greg,

What you say makes a lot of sense.  I just wanted to clarify a
couple
of
subtle points.

AL1. There is a functional reason to handle errors that happen
on
send
(oginate in the producer logic in the client) vs. errors that
are
returned
from the broker.  The problem is that RecordTooLargeException
is
returned
in two cases: (1) the producer logic on the client checks that
record
is
too large and throws the exception before doing anything with
this
--
this
is very "clean" situation with one specific record being marked
as
"poison
pill" and rejected; (2) the broker throws the same error if the
batch
is
too large -- the batch may include multiple records and none of
them
would
necessarily be a "poison pill" record, it's just a random
misconfiguration
of client vs. broker.  Because of this, a more "complete"
solution
that
allows ignoring RecordTooLargeException regardless of its
origin
is
actually incorrect, while a "partial" solution that allows
ignoring
RecordTooLargeException only originating in client code
accomplishes
the
required functionality.  This is an important nuance and should
be
added
to
the KIP.  Obviously, we could solve this problem by changing
logic
in
the
broker to return a different error when the batch is too large,
but
right
now this is not the case (and to have the correct error
handling
we'd
need
to know the version of the broker so we can only drop the
records
if
the
error is returned from a broker that knows to return a
different
error).

AL2.  In a high performance system, "just an optimization" can
be a
functional requirement -- if a solution impacts memory or
computational
complexity (in the sense of bigO notation) on the main code
path
I
can
justify changing APIs to avoid such an impact.  I'll let
KStream
folks
comment on whether an implementation that requires storing
records
in
memory actually violates the computational complexity on the
main
code
path, I just wanted to make the point that we shouldn't
necessarily
dismiss
API changes that allow for optimizations.

-Artem

On Fri, Jul 12, 2024 at 1:07 PM Greg Harris
<greg.har...@aiven.io.invalid

wrote:

Hi all,

Alieh, thanks for the KIP! And everyone else, thanks for the
robust
discussion.

I understand that there are situations in which users desire
that
the
pipeline "just keep working" and skip errors. However, I
question
whether
it is appropriate to support/encourage this behavior via
inclusion
in
the
Producer API.
This feature is essentially a "non-atomic transaction", as it
allows
commits in which not all records passed to send() ultimately
get
committed.
As atomicity is one of the most important semantics associated
with
transactions, I question whether there are users other than
Streams
that
would choose non-atomic transactions over a
traditional/idempotent
producer.
Some cursory research shows that non-atomic transactions may
be
present
in
other databases, but is actively discouraged due to the
complexity
they
add
to error-handling. [1]

I'd like to invoke the End-to-End Arguments in System Design
[2]
here,
and
recommend that this behavior may be present in Streams, but
should
not
be
in the Producer.
1. Dropping records that cause errors is already expressible
via
the
current Producer API. You can store the records in-memory
after
calling
send(), wait for a successful no-error flush() before calling
commitTransaction() and allowing the record to be garbage
collected.
If
errors occur, abortTransaction() and re-submit the records.
2. Implementing this inside the Producer API is complex and
difficult
to
holistically define in a way that we won't regret or need to
change
later.
I think some of the disagreement in this thread originates
from
this,
and I
don't find the proposed API satisfactory.
3. The performance improvement of including this change in the
lower
level
needs to be quantified in order to be a justification, and I
don't
see
any
analysis about this.

I imagine that the alternative implementation I suggested in
(1)
would
also
enable more expressive error handlers in Streams, if such a
thing
was
desired. Keeping the record around until after the transaction
is
committed
would enable a DLQ or passing the erroneous record to the
error
handler.

I think that the current pattern of the application being
responsible
for
providing good data to the producer is very reasonable; Having
the
producer
responsible for implementing the application's error handling
of
bad
data
is not something I can support.

Thanks,
Greg

[1] https://www.sommarskog.se/error_handling/Part1.html
[2]

https://web.mit.edu/Saltzer/www/publications/endtoend/endtoend.pdf

On Fri, Jul 12, 2024 at 8:52 AM Justine Olshan
<jols...@confluent.io.invalid>
wrote:

Can we update the KIP to clearly document these decisions?

Thanks,

Justine

On Tue, Jul 9, 2024 at 9:25 AM Andrew Schofield <
andrew_schofi...@live.com

wrote:

Hi Chris,
As it stands, the error handling for transactions in
KafkaProducer
is
not
ideal. There’s no reason why a failed operation should fail
a
transaction
provided that the application can tell that the operation
was
not
included
in the transaction and then make its own decision whether to
continue
or
back out. So, I think I disagree with the original premise
of
a
client-side
error state for a transaction, but we are where we are.

When I voted, I did not expect the KIP to handle ALL errors
which
could
conceivably be handled. I did expect it to handle
client-side
send
errors
that would cause a record to be rejected from a batch before
sending
to a
broker. I think that it does make the KafkaProducer
interface
very
slightly
more complicated, but the new option is a clear improvement
and
I
don’t see anyone getting into a mess using it.

I think broker-side errors are more tricky and I don’t think
an
overload
on the send() method is going to do the job. I don’t see
that
as
a
problem
with the KIP, just that the underlying RPCs and behaviour is
not
very
amenable to record-specific error handling. The Produce RPC
is a
complicated beast which can include a set of records for
mutiple
topic-partitions. Although ProduceResponse v10 does include
record
errors, I don’t believe this is surfaced in the client.
Let’s
imagine
something
like broker-side record validation which barfs on one
record.
Failing
an
entire batch is easier, but less useful if the problem is
related
to
one
record.

In summary, I’m happy that my vote stands, and I am happy
with
the
KIP
only supporting client-side record errors.

Thanks,
Andrew

On 8 Jul 2024, at 16:37, Chris Egerton
<chr...@aiven.io.INVALID

wrote:

Hi Alieh,

Can you clarify why broker-side errors shouldn't be
covered?
The
only
real
rationale I can come up with is that it's easier to
implement.

"Things were better for Kafka Streams before KAFKA-9279 was
fixed"
isn't
very convincing, because Kafka Streams is not the only user
of
the
Java
producer client. And for others, especially new users, I
doubt
that
this
new API we're proposing would make sense without having to
consult a
lot
of
historical context.

I also don't think that most users will know or even care
about
the
distinction between errors that cause a record to fail
before
it's
added
to
a batch vs. after. If you were writing a producer
application
of
your
own,
and you wanted to handle RecordTooLargeException instances
by
dropping
a
record without aborting a transaction, would you care about
whether
it
was
your client or your broker that balked? Would you be happy
if
you
wrote
logic expecting that that problem was solved once and for
all,
only
to
learn that it could still affect you in other
circumstances?
Or,
alternatively, would you be happy if you wanted to solve
that
problem
and
found an API that seemed to do exactly what you wanted, but
after
reading
the fine print, realized you'd have to do it yourself
instead?

Ultimately, the more I think about this, the more I believe
that
we're
adding noise to the API (with the new overloaded variant of
send)
for a
feature that will likely bring confusion and even
frustration
to
anyone
besides maintainers of Kafka Streams who tries to use it.

If the only concern about covering broker-side errors is
that
it
would
be
more difficult to implement, I believe we should strongly
reconsider
that
alternative. That said, if there is a straightforward way
to
explain
this
feature to new users that won't mislead them or require
them
to
do
research
on producer internals, then I can still live with it.

Regarding a list of recoverable vs. irrecoverable errors,
this
is
actually
the subject of another recently-introduced KIP:










https://cwiki.apache.org/confluence/display/KAFKA/KIP-1050%3A+Consistent+error+handling+for+Transactions

Finally, I'd also like to ask the people who have already
voted
(Andrew,
Matthias) if, at the time they voted, they believed that
the
API
would
handle all errors, or only the subset of errors that would
cause a
record
to be rejected from a batch before it can be sent to a
broker.

Best,

Chris

On Thu, Jul 4, 2024 at 12:43 PM Alieh Saeedi
<asae...@confluent.io.invalid>
wrote:

Salut from the KIP’s author


Clarifying two points:


1) broker side errors:

As far as I remember we are not going to cover the errors
originating
from
the broker!

A historical fact: One of the debate points in KIP-1038
was
that
by
defining a producer custom handler, the user may assume
that
broker-side
errors must be covered as well. They may define a handler
for
handling
`RecordTooLargeException` and still see such errors not
being
handled
as
they wish.


2) Regarding irrecoverable/recoverable errors:

Before the fix of `KAFKA-9279`,  errors such as
`RecordTooLargeException`
or errors related to missing meta data (both originating
from
Producer
`send()`) were considered as recoverable but after that
they
turned
into
being irrecoverable without changing any Javadocs or
having
any
KIP.
All
the effort made in this KIP and the former one have been
towards
returning
to the former state.


I am sure that it is clear for you that which sort of
errors
we
are
going
to cover: A single record may happen to NOT get added to
the
batch
due
to
the issues with the record or its corresponding topic. The
point
was
that
if the record is not added to the batch let ’s don’t fail
the
whole
batch
because of that non-existing record. We never intended to
do
sth
in
broker
side or ignore more important errors.  But I agree with
you
Chris.
If
we
are adding a new API, we must have good documentation for
that.
The
sentence `all irrecoverable transactional errors will
still
be
fatal`
as
you suggested is good. What do you think? I am totally
against
enumerating
errors in Javadocs since these sort of errors can be
changing
during
time.  More
over, have you ever seen any list of recoverable or
irrecoverable
errors
somewhere so far?


Bests,

Alieh

On Wed, Jul 3, 2024 at 6:07 PM Chris Egerton
<chr...@aiven.io.invalid

wrote:

Hi Justine,

I agree that enumerating a list of errors that should be
covered by
the
KIP
is difficult; I was thinking it might be easier if we
list
the
errors
that
should _not_ be covered by the KIP, and only if we can't
define
a
reasonable heuristic that would cover them without having
to
explicitly
list them. Could it be enough to say "all irrecoverable
transactional
errors will still be fatal", or even just "all
transactional
errors
(as
opposed to errors related to this specific record) will
still
be
fatal"?

Cheers,

Chris

On Wed, Jul 3, 2024 at 11:56 AM Justine Olshan
<jols...@confluent.io.invalid>
wrote:

Hey Chris,

I think what you say makes sense. I agree that defining
the
behavior
based
on code that can possibly change is not a good idea,
and I
was
trying
to
get a clearer definition from the KIP's author :)

I think it can always be hard to ensure that only
specific
errors
are
handled unless they are explicitly enumerated in code as
the
code
can
change and can be changed by folks who are not aware of
this
KIP
or
conversation.
I personally don't have the bandwidth to do this
definition/enumeration
of
errors, so hopefully Alieh can expand upon this.

Justine

On Wed, Jul 3, 2024 at 8:28 AM Chris Egerton
<chr...@aiven.io.invalid

wrote:

Hi Alieh,

I don't love defining the changes for this KIP in terms
of
a
catch
clause
in the KafkaProducer class, for two reasons. First, the
set
of
errors
that
are handled by that clause may shift over time as the
code
base
is
modified, and second, it would be fairly opaque to
users
who
want
to
understand whether an error would be affected by using
this
API
or
not.

It also seems strange that we'd handle some types of
RecordTooLargeException (i.e., ones reported
client-side)
with
this
API,
but not others (i.e., ones reported by a broker).

I think this kind of API would be most powerful, most
intuitive
to
users,
and easiest to document if we expanded the scope to all
record-send-related
errors, except anything indicating issues with
exactly-once
semantics.
That
would include records that are too large (when caught
both
client-
and
server-side), records that can't be sent due to
authorization
failures,
records sent to nonexistent topics/topic partitions,
and
keyless
records
sent to compacted topics. It would not include
ProducerFencedException, InvalidProducerEpochException,
UnsupportedVersionException,
and possibly others.

@Justine -- do you think it would be possible to
develop
either a
better
definition for the kinds of "excluded" errors that
should
not
be
covered
by
this API, or, barring that, a comprehensive list of
exact
error
types?
And
do you think this would be acceptable in terms of risk
and
complexity?

Cheers,

Chris

On Tue, Jul 2, 2024 at 5:05 PM Alieh Saeedi
<asae...@confluent.io.invalid

wrote:

Hey Justine,

About the consequences: the consequences will be like
when
we
did
not
have
the fix made in `KAFKA-9279`: silent loss of data!
Obviously,
when
the
user
intentionally chose to ignore errors, that would not
be
silent
any
more.
Right?
Of course, considering all types of `ApiException`s
would
be
too
broad.
But
are the exceptions caught in `catch(ApiException e)`
of
the
`doSend()`
method also too broad?

-Alieh

On Tue, Jul 2, 2024 at 9:45 PM Justine Olshan
<jols...@confluent.io.invalid

wrote:

Hey Alieh,

If we want to allow any error to be ignored we should
probably
run
through
all the errors to make sure they make sense.
I just want to feel confident that we aren't just
making
a
decision
without
considering the consequences carefully.

Justine

On Tue, Jul 2, 2024 at 12:30 PM Alieh Saeedi
<asae...@confluent.io.invalid

wrote:

Hey Justine,

yes we talked about `RecordTooLargeException` as an
example,
but
did
we
ever limit ourselves to only this specific
exception?
I
think
neither
in
the KIP nor in the PR.  As Chris mentioned, this KIP
is
going
to
undo
what
we have done in `KAFKA-9279` in case 1) the user is
in a
transaction
and
2)
he decides to ignore the errors in which the record
was
not
even
added
to
the batch. Yes, and we suggested some methods for
undoing
or,
in
fact,
moving back the transaction from the error state in
`flush` or
in
`commitTnx` and we finally came to the idea of not
even
doing
the
changes
(better than undoing) in `send`.

Bests,
Alieh

On Tue, Jul 2, 2024 at 8:03 PM Justine Olshan
<jols...@confluent.io.invalid

wrote:

Hey folks,

I understand where you are coming from by asking
for
specific
use
cases.
My
understanding based on previous conversations was
that
there
were a
few
different errors that have been seen.
One example I heard some information about was when
the
record
was
too
large and it fails the batch. Besides that, I'm not
really
sure
if
there
are cases in mind, though it is fair to ask on
those
and
bring
them
up.

Does a record qualify as a poison pill if it
targets a
topic
that
doesn't exist? Or if it targets a topic that the
producer
principal
lacks
ACLs for? What if it fails broker-side validation
(e.g.,
has
a
null
key
for
a compacted topic)?

I think there was some parallel work with
addressing
the
UnknownTopicOrPartitionError in another way. As for
the
other
checks,
acls,
validation etc. I am not aware of that being in
Alieh's
scope,
but
we
should be clear about exactly what we are doing.

All errors that fall into ApiException seems too
broad
to
me.

Justine

On Tue, Jul 2, 2024 at 10:51 AM Alieh Saeedi
<asae...@confluent.io.invalid

wrote:

Hey Chris,
thanks for sharing your concerns.

1) About the language of KIP (or maybe later in
Javadocs):
Is
that
alright
if I write all errors that fall into the
`ApiException`
category
thrown
(actually returned) by Producer?
2) About future expansion: do you have any better
suggestions
for
enum
names? Do you think `IGNORE_API_EXEPTIONS` or
something
like
that
is
a
"better/more accurate" one?

Bests,
Alieh

On Tue, Jul 2, 2024 at 7:29 PM Chris Egerton
<chr...@aiven.io.invalid

wrote:

Hi Alieh and Justine,

I'm concerned that we're settling on a definition
of
"poison
pill"
that's
easiest to tackle right now but may lead to
shortcomings
down
the
road. I
understand the relationship between this KIP and
KAFKA-9279,
and
I
can
totally get behind the desire to keep things
small,
focused,
and
simple
in
the name of avoiding bugs. However, what I don't
think
is
clear
at
all
is
what the "specific circumstances" are that
Justine
mentioned. I
had a
drastically different idea of what the intended
behavioral
change
would
be
before looking at the draft PR.

I would like 1) for us to be clearer about the
categories
of
errors
that
we
want to cover with this new API (especially since
we'll
have
to
find
a
clear, succinct way to document this for users),
and
2)
to
make
sure
that
if we do try to expand this API in the future,
that
we
won't
be
painted
into a corner.

For item 1, hopefully we can agree that the
language
in
the
KIP
for IGNORE_SEND_ERRORS ("The records causing
irrecoverable
errors
are
excluded from the batch and the transaction is
committed
successfully.")
is
pretty vague. If we start using the phrase
"poison
pill
record"
that
could
help, but IMO more detail would still be needed.
We
know
that
we
want
to
include records that are so large that they can
be
immediately
rejected
by
the producer. But there are other cases that
users
might
expect
to
be
handled. Does a record qualify as a poison pill
if
it
targets a
topic
that
doesn't exist? Or if it targets a topic that the
producer
principal
lacks
ACLs for? What if it fails broker-side validation
(e.g.,
has
a
null
key
for
a compacted topic)?

For item 2, this really depends on how narrow the
scope
of
what
we're
doing
right now is. If we only handle a subset of the
examples
I
laid
out
above
that could possibly be considered poison pills
with
this
KIP,
do
we
want
to
lock ourselves in to never addressing more in the
future,
or
can
we
choose
an API (probably just enum names would be the
only
important
decision
here)
that leaves room for more later?

Best,

Chris



On Tue, Jul 2, 2024 at 12:28 PM Justine Olshan
<jols...@confluent.io.invalid>
wrote:

Chris and Alieh,

My understanding is that this KIP is really only
trying
to
solve
an
issue
of a "poison pill" record that fails send().
We've talked a lot about having a generic
framework
for
all
errors,
but I
don't think that is what this KIP is trying to
do.
Essentially
the
request
is to undo the change from KAFKA-9279
<
https://issues.apache.org/jira/browse/KAFKA-9279

but
under
specific
circumstances that are controlled. I really am
concerned
about
opening
new
avenues for bugs with EOS and hesitate to handle
any
other
types
of
errors.
I think if we all agree on the problem that we
are
trying
to
solve,
it
is
easier to agree on solutions.

Justine

On Mon, Jul 1, 2024 at 2:20 AM Alieh Saeedi
<asae...@confluent.io.invalid

wrote:

Hi Matthias,
Thanks for the valid points you mentioned. I
updated
the
KIP
and
the
PR
with:
1) mentioning that the new overloaded `send`
throws
`IllegalStateException`
if the user tries to ignore `send()` errors
outside
of
a
transaction.
2) the default implementation in `Producer`
interface
throws
an
`UnsupportedOperationException`

Hi Chris,
Thanks for the feedback. I tried to clarify the
points
you
listed:
-------> we've narrowed the scope from any
error
that
might
take
place
with
producing a record to Kafka, to only the ones
that
are
thrown
directly
from
Producer::send;

   From the very beginning and even since
KIP-1038,
the
main
purpose
was
to
have "more flexibility," or, in other words,
"giving
the
user
the
authority" to handle some specific exceptions
thrown
from
the
`Producer`.
Due to the specific cases we had in mind,
KIP-1038
was
discarded
and
we
decided to not define a
`CustomExceptionHandler`
for
`Producer`
and
instead
treat the `send` failures in a different way.
The
main
issue
is
that
`send`
makes a transition to error state, which is
undoable.
In
fact,
one
single
poison pill record makes the whole batch fail.
The
former
suggestions
that
you agreed with have been all about un-doing
this
transition
in
`flush`
or
`commit`. The new suggestion is to un-do (or
better,
NOT
do)
in
`send`
due
to the reasons listed in the discussions above.
Moreover, I would say that having such a large
scope
as
you
mentioned
is
impossible. In the best case, we may have
control
over
the
`Producer`.
What
shall we do with the broker? The `any error
that
might
take
place
with
producing a record to Kafka` is too much, I
think.

-------> is this all we want to handle, and
will
it
prevent
us
from
handling more in the future in an intuitive
way?

I think yes. This is all we want. Other sorts
of
errors
such
as
having
problem with partition addition, producer
fenced
exception,
etc
seem
to
be
more serious issues. The intention was to
handle
problems
created
by
(maybe) a single poison pill record. BTW, I do
not
see
any
obstacles
to
future changes.

Bests,
Alieh

On Sat, Jun 29, 2024 at 3:03 AM Chris Egerton
<chr...@aiven.io.invalid

wrote:

Ah, sorry--spoke too soon. The PR doesn't show
that
errors
thrown
from
Producer::send are handled, but instead,
ApiException
instances
that
are
caught inside KafkaProducer::doSend and are
handled
by
returning
an
already-failed future are. I think the same
question
still
applies
(is
this
all we want to handle, and will it prevent us
from
handling
more
in
the
future in an intuitive way), though.

On Fri, Jun 28, 2024 at 8:57 PM Chris Egerton
<
chr...@aiven.io

wrote:

Hi Alieh,

This KIP has evolved a lot since I last
looked
at
it,
but
the
changes
seem
well thought-out both in semantics and API.
One
clarifying
question I
have
is that it looks based on the draft PR that
we've
narrowed
the
scope
from
any error that might take place with
producing
a
record
to
Kafka,
to
only
the ones that are thrown directly from
Producer::send;
is
that
the
intended
behavior here? And if so, do you have
thoughts
on
how
we
might
design a
follow-up KIP that would catch all errors
(including
ones
reported
asynchronously instead of synchronously)? I'd
like
it
if
we
could
leave
the
door open for that without painting ourselves
into
too
much
of
a
corner
with the API design for this KIP.

Cheers,

Chris

On Fri, Jun 28, 2024 at 6:31 PM Matthias J.
Sax <
mj...@apache.org>
wrote:

Thanks Alieh,

it seems this KIP can just pick between a
couple
of
tradeoffs.
Adding
an
overloaded `send()` as the KIP propose makes
sense
to
me
and
seems
to
provides the cleanest solution compare to
there
options
we
discussed.

Given the explicit name of the passed-in
option
that
highlights
that
the
option is for TX only make is pretty clear
and
avoids
the
issue
of
`flush()` ambiguity.


Nit: We should make clear on the KIP though,
that
the
new
`send()`
overload would throw an
`IllegalStateException`
if
TX
are
not
used
(similar to other TX methods like initTx(),
etc)


About the `Producer` interface, I am not
sure
how
this
was
done
in
the
past (eg, KIP-266 added
`Consumer.poll(Duration)`
w/o
a
default
implementation), if we need a default
implementation
for
backward
compatibility or not? If we do want to add
one,
I
think
it
would
be
appropriate to throw an
`UnsupportedOperationException`
by
default,
instead of just keeping the default impl
empty?


My points are rather minor, and should not
block
this
KIP
though.
Overall LGTM.



-Matthias

On 6/27/24 1:28 PM, Alieh Saeedi wrote:
Hi Justine,

Thanks for the suggestion.
Making applications to validate every
single
record
is
not
the
best
way,
from an efficiency point of view.
Moreover, between changing the behavior of
the
Producer
in
`send`
and
`commitTnx`, the former seems more
reasonable
and
clean.

Bests,
Alieh

On Thu, Jun 27, 2024 at 8:14 PM Justine
Olshan
<jols...@confluent.io.invalid>
wrote:

Hey Alieh,

I see there are two options now. So folks
will
be
discussing
the
approaches
and deciding the best way forward before
we
vote?
I do think there could be a problem with
the
approach
on
commit
if
we
get
stuck on an earlier error and have more
records
(potentially
on
new
partitions) to commit as the current PR is
implemented.

I guess this takes us back to the question
of
whether
the
error
should
be
cleared on send.

(And I guess at the back of my mind, I'm
wondering
if
there
is
a
way
we can
validate the "posion pill" records
application
side
before
we
even
try
to
send them)

Justine

On Wed, Jun 26, 2024 at 4:38 PM Alieh
Saeedi
<asae...@confluent.io.invalid

wrote:

Hi Justine,

I did not update the KIP with
`TxnSendOption`
since
I
thought
it'd
be
better discussed here beforehand.
right now, there are 2 PRs:
- the PR that implements the current
version
of
the
KIP:

https://github.com/apache/kafka/pull/16332
- the POC PR that clarifies the
`TxnSendOption`:

https://github.com/apache/kafka/pull/16465

Bests,
Alieh

On Thu, Jun 27, 2024 at 12:42 AM Justine
Olshan
<jols...@confluent.io.invalid> wrote:

Hey Alieh,

I think I am a little confused. Are the
3
points
above
addressed
by
the
KIP
or did something change? The PR seems to
not
include
this
change
and
still
has the CommitOption as well.

Thanks,
Justine

On Wed, Jun 26, 2024 at 2:15 PM Alieh
Saeedi
<asae...@confluent.io.invalid

wrote:

Hi all,


Looking at the PR <
https://github.com/apache/kafka/pull/16332

corresponding to the KIP, there are
some
points
worthy
of
mention:


1) clearing the error ends up
dirty/messy
code
in
`TransactionManager`.

2) By clearing the error, we are
actually
doing
an
illegal
transition
from
`ABORTABLE_ERROR` to `IN_TRANSACTION`
which
is
conceptually
not
acceptable.
This can be the root cause of some
issues,
with
perhaps
further
future
changes by others.

3) If the poison pill record `r1`
causes
a
transition
to
the
error
state
and then the next record `r2` requires
adding
a
partition
to
the
transaction, the action fails due to
being
in
the
error
state.
In
this
case, clearing errors during
`commitTnx(CLEAR_SEND_ERROR)`
is
too
late.
However, this case can NOT be the main
concern
as
soon
as
KIP-890
is
fully
implemented.


My suggestion is to solve the problem
where
it
arises.
If
the
transition
to
the error state does not happen during
`send()`,
we
do
not
need
to
clear
the error later. Therefore, instead of
`CommitOption`,
we
can
define
a
`TxnSendOption` and prevent the
`send()`
method
from
going
to
the
error
state in case 1) we're in a transaction
and
2)
the
user
asked
for
IGONRE_SEND_ERRORS. For more clarity,
you
can
take a
look
at
the
POC
PR
<
https://github.com/apache/kafka/pull/16465
.

Cheers,
Alieh




































Reply via email to