This is an automated email from the ASF dual-hosted git repository.
jgus pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 3ebd32b MINOR: Use top-level error in
`UpdateFeaturesRequest.getErrorResponse` (#9781)
3ebd32b is described below
commit 3ebd32b2a563440645de45ecb578d8a8dccbee95
Author: Jason Gustafson <[email protected]>
AuthorDate: Tue Dec 29 13:41:56 2020 -0800
MINOR: Use top-level error in `UpdateFeaturesRequest.getErrorResponse`
(#9781)
The current `getErrorResponse` sets all of the feature errors, but does not
set a top-level error. It seems like the whole point of having the top-level
error is so that it could be used in cases like this. This patch also updates
`errorCounts` in `UpdateFeaturesResponse` so that the top-level error is
included.
Reviewers: Chia-Ping Tsai <[email protected]>
---
.../kafka/clients/admin/KafkaAdminClient.java | 9 ++--
.../common/requests/UpdateFeaturesRequest.java | 28 ++++------
.../common/requests/UpdateFeaturesResponse.java | 21 ++++----
.../common/requests/UpdateFeaturesRequestTest.java | 56 ++++++++++++++++++++
.../requests/UpdateFeaturesResponseTest.java | 61 ++++++++++++++++++++++
5 files changed, 141 insertions(+), 34 deletions(-)
diff --git
a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
index 857d502..bbdd2dd 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
@@ -4421,8 +4421,8 @@ public class KafkaAdminClient extends AdminClient {
final UpdateFeaturesResponse response =
(UpdateFeaturesResponse) abstractResponse;
- Errors topLevelError =
Errors.forCode(response.data().errorCode());
- switch (topLevelError) {
+ ApiError topLevelError = response.topLevelError();
+ switch (topLevelError.error()) {
case NONE:
for (final UpdatableFeatureResult result :
response.data().results()) {
final KafkaFutureImpl<Void> future =
updateFutures.get(result.feature());
@@ -4442,12 +4442,11 @@ public class KafkaAdminClient extends AdminClient {
feature -> "The controller response did not
contain a result for feature " + feature);
break;
case NOT_CONTROLLER:
- handleNotControllerError(topLevelError);
+ handleNotControllerError(topLevelError.error());
break;
default:
for (final Map.Entry<String, KafkaFutureImpl<Void>>
entry : updateFutures.entrySet()) {
- final String errorMsg =
response.data().errorMessage();
-
entry.getValue().completeExceptionally(topLevelError.exception(errorMsg));
+
entry.getValue().completeExceptionally(topLevelError.exception());
}
break;
}
diff --git
a/clients/src/main/java/org/apache/kafka/common/requests/UpdateFeaturesRequest.java
b/clients/src/main/java/org/apache/kafka/common/requests/UpdateFeaturesRequest.java
index fc18924..7a6bf66 100644
---
a/clients/src/main/java/org/apache/kafka/common/requests/UpdateFeaturesRequest.java
+++
b/clients/src/main/java/org/apache/kafka/common/requests/UpdateFeaturesRequest.java
@@ -16,15 +16,13 @@
*/
package org.apache.kafka.common.requests;
-import java.nio.ByteBuffer;
-import
org.apache.kafka.common.message.UpdateFeaturesRequestData.FeatureUpdateKey;
-import org.apache.kafka.common.message.UpdateFeaturesResponseData;
import org.apache.kafka.common.message.UpdateFeaturesRequestData;
-import
org.apache.kafka.common.message.UpdateFeaturesResponseData.UpdatableFeatureResult;
-import
org.apache.kafka.common.message.UpdateFeaturesResponseData.UpdatableFeatureResultCollection;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.ByteBufferAccessor;
+import java.nio.ByteBuffer;
+import java.util.Collections;
+
public class UpdateFeaturesRequest extends AbstractRequest {
public static class Builder extends
AbstractRequest.Builder<UpdateFeaturesRequest> {
@@ -55,20 +53,12 @@ public class UpdateFeaturesRequest extends AbstractRequest {
}
@Override
- public AbstractResponse getErrorResponse(int throttleTimeMs, Throwable e) {
- final ApiError apiError = ApiError.fromThrowable(e);
- final UpdatableFeatureResultCollection results = new
UpdatableFeatureResultCollection();
- for (FeatureUpdateKey update : this.data.featureUpdates().valuesSet())
{
- final UpdatableFeatureResult result = new UpdatableFeatureResult()
- .setFeature(update.feature())
- .setErrorCode(apiError.error().code())
- .setErrorMessage(apiError.message());
- results.add(result);
- }
- final UpdateFeaturesResponseData responseData = new
UpdateFeaturesResponseData()
- .setThrottleTimeMs(throttleTimeMs)
- .setResults(results);
- return new UpdateFeaturesResponse(responseData);
+ public UpdateFeaturesResponse getErrorResponse(int throttleTimeMs,
Throwable e) {
+ return UpdateFeaturesResponse.createWithErrors(
+ ApiError.fromThrowable(e),
+ Collections.emptyMap(),
+ throttleTimeMs
+ );
}
@Override
diff --git
a/clients/src/main/java/org/apache/kafka/common/requests/UpdateFeaturesResponse.java
b/clients/src/main/java/org/apache/kafka/common/requests/UpdateFeaturesResponse.java
index 028f885..26825a0 100644
---
a/clients/src/main/java/org/apache/kafka/common/requests/UpdateFeaturesResponse.java
+++
b/clients/src/main/java/org/apache/kafka/common/requests/UpdateFeaturesResponse.java
@@ -16,10 +16,6 @@
*/
package org.apache.kafka.common.requests;
-import java.nio.ByteBuffer;
-import java.util.Map;
-import java.util.stream.Collectors;
-
import org.apache.kafka.common.message.UpdateFeaturesResponseData;
import
org.apache.kafka.common.message.UpdateFeaturesResponseData.UpdatableFeatureResult;
import
org.apache.kafka.common.message.UpdateFeaturesResponseData.UpdatableFeatureResultCollection;
@@ -27,6 +23,9 @@ import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.ByteBufferAccessor;
import org.apache.kafka.common.protocol.Errors;
+import java.nio.ByteBuffer;
+import java.util.HashMap;
+import java.util.Map;
/**
* Possible error codes:
@@ -45,16 +44,18 @@ public class UpdateFeaturesResponse extends
AbstractResponse {
this.data = data;
}
- public Map<String, ApiError> errors() {
- return data.results().valuesSet().stream().collect(
- Collectors.toMap(
- result -> result.feature(),
- result -> new ApiError(Errors.forCode(result.errorCode()),
result.errorMessage())));
+ public ApiError topLevelError() {
+ return new ApiError(Errors.forCode(data.errorCode()),
data.errorMessage());
}
@Override
public Map<Errors, Integer> errorCounts() {
- return apiErrorCounts(errors());
+ Map<Errors, Integer> errorCounts = new HashMap<>();
+ updateErrorCounts(errorCounts, Errors.forCode(data.errorCode()));
+ for (UpdatableFeatureResult result : data.results()) {
+ updateErrorCounts(errorCounts, Errors.forCode(result.errorCode()));
+ }
+ return errorCounts;
}
@Override
diff --git
a/clients/src/test/java/org/apache/kafka/common/requests/UpdateFeaturesRequestTest.java
b/clients/src/test/java/org/apache/kafka/common/requests/UpdateFeaturesRequestTest.java
new file mode 100644
index 0000000..9c625b5
--- /dev/null
+++
b/clients/src/test/java/org/apache/kafka/common/requests/UpdateFeaturesRequestTest.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.requests;
+
+import org.apache.kafka.common.errors.UnknownServerException;
+import org.apache.kafka.common.message.UpdateFeaturesRequestData;
+import org.apache.kafka.common.protocol.Errors;
+import org.junit.Test;
+
+import java.util.Collections;
+
+import static org.junit.Assert.assertEquals;
+
+public class UpdateFeaturesRequestTest {
+
+ @Test
+ public void testGetErrorResponse() {
+ UpdateFeaturesRequestData.FeatureUpdateKeyCollection features =
+ new UpdateFeaturesRequestData.FeatureUpdateKeyCollection();
+
+ features.add(new UpdateFeaturesRequestData.FeatureUpdateKey()
+ .setFeature("foo")
+ .setMaxVersionLevel((short) 2)
+ );
+
+ features.add(new UpdateFeaturesRequestData.FeatureUpdateKey()
+ .setFeature("bar")
+ .setMaxVersionLevel((short) 3)
+ );
+
+ UpdateFeaturesRequest request = new UpdateFeaturesRequest(
+ new UpdateFeaturesRequestData().setFeatureUpdates(features),
+ UpdateFeaturesRequestData.HIGHEST_SUPPORTED_VERSION
+ );
+
+ UpdateFeaturesResponse response = request.getErrorResponse(0, new
UnknownServerException());
+ assertEquals(Errors.UNKNOWN_SERVER_ERROR,
response.topLevelError().error());
+ assertEquals(0, response.data().results().size());
+ assertEquals(Collections.singletonMap(Errors.UNKNOWN_SERVER_ERROR, 1),
response.errorCounts());
+ }
+
+}
diff --git
a/clients/src/test/java/org/apache/kafka/common/requests/UpdateFeaturesResponseTest.java
b/clients/src/test/java/org/apache/kafka/common/requests/UpdateFeaturesResponseTest.java
new file mode 100644
index 0000000..54b64f9
--- /dev/null
+++
b/clients/src/test/java/org/apache/kafka/common/requests/UpdateFeaturesResponseTest.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.requests;
+
+import org.apache.kafka.common.message.UpdateFeaturesResponseData;
+import org.apache.kafka.common.protocol.Errors;
+import org.junit.Test;
+
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
+
+public class UpdateFeaturesResponseTest {
+
+ @Test
+ public void testErrorCounts() {
+ UpdateFeaturesResponseData.UpdatableFeatureResultCollection results =
+ new UpdateFeaturesResponseData.UpdatableFeatureResultCollection();
+
+ results.add(new UpdateFeaturesResponseData.UpdatableFeatureResult()
+ .setFeature("foo")
+ .setErrorCode(Errors.UNKNOWN_SERVER_ERROR.code())
+ );
+
+ results.add(new UpdateFeaturesResponseData.UpdatableFeatureResult()
+ .setFeature("bar")
+ .setErrorCode(Errors.UNKNOWN_SERVER_ERROR.code())
+ );
+
+ results.add(new UpdateFeaturesResponseData.UpdatableFeatureResult()
+ .setFeature("baz")
+ .setErrorCode(Errors.FEATURE_UPDATE_FAILED.code())
+ );
+
+ UpdateFeaturesResponse response = new UpdateFeaturesResponse(new
UpdateFeaturesResponseData()
+ .setErrorCode(Errors.INVALID_REQUEST.code())
+ .setResults(results)
+ );
+
+ Map<Errors, Integer> errorCounts = response.errorCounts();
+ assertEquals(3, errorCounts.size());
+ assertEquals(1, errorCounts.get(Errors.INVALID_REQUEST).intValue());
+ assertEquals(2,
errorCounts.get(Errors.UNKNOWN_SERVER_ERROR).intValue());
+ assertEquals(1,
errorCounts.get(Errors.FEATURE_UPDATE_FAILED).intValue());
+ }
+
+}