lucasbru commented on PR #14639:
URL: https://github.com/apache/kafka/pull/14639#issuecomment-1782544764
Changes look good to me. Did I understand david correctly that we want to
add more changes to this PR?
> 1. Mockito: Could you be more specific on how you expect to mock the
response object?
That was just "out of interest", since you are already using mockito to mock
things, why not use it for the response object as well. But I realize that the
object is a pretty plain data container, so the answer I guess is that it's
easier.
> 2. Error handling: Essentially all errors in
`continueHandlePartitionErrors` can only happen in the response. I understand
there are some redundancy there and can be a bit confusing. But the response
and both throw a hard failures (response = null and throwable = non-null) or a
server side error (response = non-null). That is why it was kept separated.
If you find it unclear - how do I make it more readable?
That's not what I meant. I just found the flow `onResponse` a bit hard to
read, so I suggested structuring it a bit differently. In particular,
`continueHandlePartitionErrors` sounds like that it's determining whether to
continue handling errors, but it's actually completing the future inside.
I think the control flow may also lead to logging in a way that may not be
intended? For fencing, we get one generic `log.error` message and one very
similar `log.info` message with some specific details. For topic authorization
errors, we get one `log.error` message for each partition, and then another
aggregated `log.error` message, listing the partitions again. However, if we
have 3 unauthorized topic errors and then one other error, we get three generic
error messages for the topics, but we do not get the aggregated error message.
Is that trying to be consistent with the old consumer?
I imagined something like:
```
if (error == Errors.NONE) {
log.debug("OffsetCommit {} for partition {}", offset, tp);
} else if (error == Errors.TOPIC_AUTHORIZATION_FAILED) {
// Collect all unauthorized topics before failing
unauthorizedTopics.add(tp.topic());
} else if (error instanceof RetriableException) {
log.warn("OffsetCommit failed on partition {} at offset {}: {}", tp,
offset, error.message());
handleRetriableError(error);
retry(responseTime);
return;
} else {
log.error("OffsetCommit failed on partition {} at offset {}: {}", tp,
offset, error.message());
handleFatalError(error);
return;
}
```
But I'm not super familiar with the code style in the new consumer, and
consistency with the rest of the code and the old consumer is also important.
So I just wanted to give this to you as an inspiration, but you are probably in
the best position to come up with the best way to implement it.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]