This is an automated email from the ASF dual-hosted git repository.
mimaison pushed a commit to branch 3.0
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/3.0 by this push:
new 77ae47d KAFKA-13258/13259/13260: Fix error response generation
(#11300)
77ae47d is described below
commit 77ae47d85b310eaf2902bc9843d870faf44203f3
Author: Mickael Maison <[email protected]>
AuthorDate: Wed Sep 8 11:12:55 2021 +0200
KAFKA-13258/13259/13260: Fix error response generation (#11300)
AlterClientQuotas, DescribeProducers and FindCoordinator have issues when
building error responses. This can lead to brokers returning responses without
errors even when some have happened.
Reviewers: David Jacot <[email protected]>
---
.../kafka/common/requests/AlterClientQuotasRequest.java | 7 ++++++-
.../kafka/common/requests/DescribeProducersRequest.java | 1 +
.../apache/kafka/common/requests/FindCoordinatorResponse.java | 11 ++++++++++-
.../org/apache/kafka/common/requests/RequestResponseTest.java | 5 +++++
4 files changed, 22 insertions(+), 2 deletions(-)
diff --git
a/clients/src/main/java/org/apache/kafka/common/requests/AlterClientQuotasRequest.java
b/clients/src/main/java/org/apache/kafka/common/requests/AlterClientQuotasRequest.java
index d03c267..3b06348 100644
---
a/clients/src/main/java/org/apache/kafka/common/requests/AlterClientQuotasRequest.java
+++
b/clients/src/main/java/org/apache/kafka/common/requests/AlterClientQuotasRequest.java
@@ -23,6 +23,7 @@ import
org.apache.kafka.common.message.AlterClientQuotasRequestData.OpData;
import org.apache.kafka.common.message.AlterClientQuotasResponseData;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.ByteBufferAccessor;
+import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.quota.ClientQuotaAlteration;
import org.apache.kafka.common.quota.ClientQuotaEntity;
@@ -117,6 +118,7 @@ public class AlterClientQuotasRequest extends
AbstractRequest {
@Override
public AlterClientQuotasResponse getErrorResponse(int throttleTimeMs,
Throwable e) {
+ Errors error = Errors.forException(e);
List<AlterClientQuotasResponseData.EntryData> responseEntries = new
ArrayList<>();
for (EntryData entryData : data.entries()) {
List<AlterClientQuotasResponseData.EntityData> responseEntities =
new ArrayList<>();
@@ -125,7 +127,10 @@ public class AlterClientQuotasRequest extends
AbstractRequest {
.setEntityType(entityData.entityType())
.setEntityName(entityData.entityName()));
}
- responseEntries.add(new
AlterClientQuotasResponseData.EntryData().setEntity(responseEntities));
+ responseEntries.add(new AlterClientQuotasResponseData.EntryData()
+ .setEntity(responseEntities)
+ .setErrorCode(error.code())
+ .setErrorMessage(error.message()));
}
AlterClientQuotasResponseData responseData = new
AlterClientQuotasResponseData()
.setThrottleTimeMs(throttleTimeMs)
diff --git
a/clients/src/main/java/org/apache/kafka/common/requests/DescribeProducersRequest.java
b/clients/src/main/java/org/apache/kafka/common/requests/DescribeProducersRequest.java
index 77e7ecc..39aab22 100644
---
a/clients/src/main/java/org/apache/kafka/common/requests/DescribeProducersRequest.java
+++
b/clients/src/main/java/org/apache/kafka/common/requests/DescribeProducersRequest.java
@@ -80,6 +80,7 @@ public class DescribeProducersRequest extends AbstractRequest
{
.setErrorCode(error.code())
);
}
+ response.topics().add(topicResponse);
}
return new DescribeProducersResponse(response);
}
diff --git
a/clients/src/main/java/org/apache/kafka/common/requests/FindCoordinatorResponse.java
b/clients/src/main/java/org/apache/kafka/common/requests/FindCoordinatorResponse.java
index 156277a..080ba24 100644
---
a/clients/src/main/java/org/apache/kafka/common/requests/FindCoordinatorResponse.java
+++
b/clients/src/main/java/org/apache/kafka/common/requests/FindCoordinatorResponse.java
@@ -26,6 +26,7 @@ import org.apache.kafka.common.protocol.Errors;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collections;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -73,7 +74,15 @@ public class FindCoordinatorResponse extends
AbstractResponse {
@Override
public Map<Errors, Integer> errorCounts() {
- return errorCounts(error());
+ if (!data.coordinators().isEmpty()) {
+ Map<Errors, Integer> errorCounts = new HashMap<>();
+ for (Coordinator coordinator : data.coordinators()) {
+ updateErrorCounts(errorCounts,
Errors.forCode(coordinator.errorCode()));
+ }
+ return errorCounts;
+ } else {
+ return errorCounts(error());
+ }
}
public static FindCoordinatorResponse parse(ByteBuffer buffer, short
version) {
diff --git
a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
index 887d15c..118a424 100644
---
a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
+++
b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
@@ -706,6 +706,11 @@ public class RequestResponseTest {
private void checkErrorResponse(AbstractRequest req, Throwable e, boolean
checkEqualityAndHashCode) {
AbstractResponse response = req.getErrorResponse(e);
checkResponse(response, req.version(), checkEqualityAndHashCode);
+ Errors error = Errors.forException(e);
+ Map<Errors, Integer> errorCounts = response.errorCounts();
+ assertEquals(Collections.singleton(error), errorCounts.keySet(),
+ "API Key " + req.apiKey().name + " v" + req.version() + " failed
errorCounts test");
+ assertTrue(errorCounts.get(error) > 0);
if (e instanceof UnknownServerException) {
String responseStr = response.toString();
assertFalse(responseStr.contains(e.getMessage()),