lucasbru commented on PR #14639:
URL: https://github.com/apache/kafka/pull/14639#issuecomment-1782544764

   Changes look good to me. Did I understand david correctly that we want to 
add more changes to this PR?
   
   > 1. Mockito: Could you be more specific on how you expect to mock the 
response object?
   
   That was just "out of interest", since you are already using mockito to mock 
things, why not use it for the response object as well. But I realize that the 
object is a pretty plain data container, so the answer I guess is that it's 
easier.
   
   > 2. Error handling: Essentially all errors in 
`continueHandlePartitionErrors` can only happen in the response. I understand 
there are some redundancy there and can be a bit confusing.  But the response 
and both throw a hard failures (response = null and throwable = non-null) or a 
server side error (response = non-null).  That is why it was kept separated.  
If you find it unclear - how do I make it more readable?
   
   That's not what I meant. I just found the flow `onResponse` a bit hard to 
read, so I suggested structuring it a bit differently. In particular, 
`continueHandlePartitionErrors` sounds like that it's determining whether to 
continue handling errors, but it's actually completing the future inside.
   
   I think the control flow may also lead to logging in a way that may not be 
intended? For fencing, we get one generic `log.error` message and one very 
similar `log.info` message with some specific details. For topic authorization 
errors, we get one `log.error` message for each partition, and then another 
aggregated `log.error` message, listing the partitions again. However, if we 
have 3 unauthorized topic errors and then one other error, we get three generic 
error messages for the topics, but we do not get the aggregated error message. 
Is that trying to be consistent with the old consumer?
   
   I imagined something like:
   
   ```
   if (error == Errors.NONE) {
       log.debug("OffsetCommit {} for partition {}", offset, tp);
   } else if (error == Errors.TOPIC_AUTHORIZATION_FAILED) {
       // Collect all unauthorized topics before failing
       unauthorizedTopics.add(tp.topic());
   } else if (error instanceof RetriableException) {
       log.warn("OffsetCommit failed on partition {} at offset {}: {}", tp, 
offset, error.message());
       handleRetriableError(error);
       retry(responseTime);
       return;
   } else {
       log.error("OffsetCommit failed on partition {} at offset {}: {}", tp, 
offset, error.message());
       handleFatalError(error);
       return;
   }
   ```
   
   But I'm not super familiar with the code style in the new consumer, and 
consistency with the rest of the code and the old consumer is also important. 
So I just wanted to give this to you as an inspiration, but you are probably in 
the best position to come up with the best way to implement it.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to