This is an automated email from the ASF dual-hosted git repository.

jgus pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 3ebd32b  MINOR: Use top-level error in 
`UpdateFeaturesRequest.getErrorResponse` (#9781)
3ebd32b is described below

commit 3ebd32b2a563440645de45ecb578d8a8dccbee95
Author: Jason Gustafson <[email protected]>
AuthorDate: Tue Dec 29 13:41:56 2020 -0800

    MINOR: Use top-level error in `UpdateFeaturesRequest.getErrorResponse` 
(#9781)
    
    The current `getErrorResponse` sets all of the feature errors, but does not 
set a top-level error. It seems like the whole point of having the top-level 
error is so that it could be used in cases like this. This patch also updates 
`errorCounts` in `UpdateFeaturesResponse` so that the top-level error is 
included.
    
    Reviewers: Chia-Ping Tsai <[email protected]>
---
 .../kafka/clients/admin/KafkaAdminClient.java      |  9 ++--
 .../common/requests/UpdateFeaturesRequest.java     | 28 ++++------
 .../common/requests/UpdateFeaturesResponse.java    | 21 ++++----
 .../common/requests/UpdateFeaturesRequestTest.java | 56 ++++++++++++++++++++
 .../requests/UpdateFeaturesResponseTest.java       | 61 ++++++++++++++++++++++
 5 files changed, 141 insertions(+), 34 deletions(-)

diff --git 
a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java 
b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
index 857d502..bbdd2dd 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
@@ -4421,8 +4421,8 @@ public class KafkaAdminClient extends AdminClient {
                 final UpdateFeaturesResponse response =
                     (UpdateFeaturesResponse) abstractResponse;
 
-                Errors topLevelError = 
Errors.forCode(response.data().errorCode());
-                switch (topLevelError) {
+                ApiError topLevelError = response.topLevelError();
+                switch (topLevelError.error()) {
                     case NONE:
                         for (final UpdatableFeatureResult result : 
response.data().results()) {
                             final KafkaFutureImpl<Void> future = 
updateFutures.get(result.feature());
@@ -4442,12 +4442,11 @@ public class KafkaAdminClient extends AdminClient {
                             feature -> "The controller response did not 
contain a result for feature " + feature);
                         break;
                     case NOT_CONTROLLER:
-                        handleNotControllerError(topLevelError);
+                        handleNotControllerError(topLevelError.error());
                         break;
                     default:
                         for (final Map.Entry<String, KafkaFutureImpl<Void>> 
entry : updateFutures.entrySet()) {
-                            final String errorMsg = 
response.data().errorMessage();
-                            
entry.getValue().completeExceptionally(topLevelError.exception(errorMsg));
+                            
entry.getValue().completeExceptionally(topLevelError.exception());
                         }
                         break;
                 }
diff --git 
a/clients/src/main/java/org/apache/kafka/common/requests/UpdateFeaturesRequest.java
 
b/clients/src/main/java/org/apache/kafka/common/requests/UpdateFeaturesRequest.java
index fc18924..7a6bf66 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/requests/UpdateFeaturesRequest.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/requests/UpdateFeaturesRequest.java
@@ -16,15 +16,13 @@
  */
 package org.apache.kafka.common.requests;
 
-import java.nio.ByteBuffer;
-import 
org.apache.kafka.common.message.UpdateFeaturesRequestData.FeatureUpdateKey;
-import org.apache.kafka.common.message.UpdateFeaturesResponseData;
 import org.apache.kafka.common.message.UpdateFeaturesRequestData;
-import 
org.apache.kafka.common.message.UpdateFeaturesResponseData.UpdatableFeatureResult;
-import 
org.apache.kafka.common.message.UpdateFeaturesResponseData.UpdatableFeatureResultCollection;
 import org.apache.kafka.common.protocol.ApiKeys;
 import org.apache.kafka.common.protocol.ByteBufferAccessor;
 
+import java.nio.ByteBuffer;
+import java.util.Collections;
+
 public class UpdateFeaturesRequest extends AbstractRequest {
 
     public static class Builder extends 
AbstractRequest.Builder<UpdateFeaturesRequest> {
@@ -55,20 +53,12 @@ public class UpdateFeaturesRequest extends AbstractRequest {
     }
 
     @Override
-    public AbstractResponse getErrorResponse(int throttleTimeMs, Throwable e) {
-        final ApiError apiError = ApiError.fromThrowable(e);
-        final UpdatableFeatureResultCollection results = new 
UpdatableFeatureResultCollection();
-        for (FeatureUpdateKey update : this.data.featureUpdates().valuesSet()) 
{
-            final UpdatableFeatureResult result = new UpdatableFeatureResult()
-                .setFeature(update.feature())
-                .setErrorCode(apiError.error().code())
-                .setErrorMessage(apiError.message());
-            results.add(result);
-        }
-        final UpdateFeaturesResponseData responseData = new 
UpdateFeaturesResponseData()
-            .setThrottleTimeMs(throttleTimeMs)
-            .setResults(results);
-        return new UpdateFeaturesResponse(responseData);
+    public UpdateFeaturesResponse getErrorResponse(int throttleTimeMs, 
Throwable e) {
+        return UpdateFeaturesResponse.createWithErrors(
+            ApiError.fromThrowable(e),
+            Collections.emptyMap(),
+            throttleTimeMs
+        );
     }
 
     @Override
diff --git 
a/clients/src/main/java/org/apache/kafka/common/requests/UpdateFeaturesResponse.java
 
b/clients/src/main/java/org/apache/kafka/common/requests/UpdateFeaturesResponse.java
index 028f885..26825a0 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/requests/UpdateFeaturesResponse.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/requests/UpdateFeaturesResponse.java
@@ -16,10 +16,6 @@
  */
 package org.apache.kafka.common.requests;
 
-import java.nio.ByteBuffer;
-import java.util.Map;
-import java.util.stream.Collectors;
-
 import org.apache.kafka.common.message.UpdateFeaturesResponseData;
 import 
org.apache.kafka.common.message.UpdateFeaturesResponseData.UpdatableFeatureResult;
 import 
org.apache.kafka.common.message.UpdateFeaturesResponseData.UpdatableFeatureResultCollection;
@@ -27,6 +23,9 @@ import org.apache.kafka.common.protocol.ApiKeys;
 import org.apache.kafka.common.protocol.ByteBufferAccessor;
 import org.apache.kafka.common.protocol.Errors;
 
+import java.nio.ByteBuffer;
+import java.util.HashMap;
+import java.util.Map;
 
 /**
  * Possible error codes:
@@ -45,16 +44,18 @@ public class UpdateFeaturesResponse extends 
AbstractResponse {
         this.data = data;
     }
 
-    public Map<String, ApiError> errors() {
-        return data.results().valuesSet().stream().collect(
-            Collectors.toMap(
-                result -> result.feature(),
-                result -> new ApiError(Errors.forCode(result.errorCode()), 
result.errorMessage())));
+    public ApiError topLevelError() {
+        return new ApiError(Errors.forCode(data.errorCode()), 
data.errorMessage());
     }
 
     @Override
     public Map<Errors, Integer> errorCounts() {
-        return apiErrorCounts(errors());
+        Map<Errors, Integer> errorCounts = new HashMap<>();
+        updateErrorCounts(errorCounts, Errors.forCode(data.errorCode()));
+        for (UpdatableFeatureResult result : data.results()) {
+            updateErrorCounts(errorCounts, Errors.forCode(result.errorCode()));
+        }
+        return errorCounts;
     }
 
     @Override
diff --git 
a/clients/src/test/java/org/apache/kafka/common/requests/UpdateFeaturesRequestTest.java
 
b/clients/src/test/java/org/apache/kafka/common/requests/UpdateFeaturesRequestTest.java
new file mode 100644
index 0000000..9c625b5
--- /dev/null
+++ 
b/clients/src/test/java/org/apache/kafka/common/requests/UpdateFeaturesRequestTest.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.requests;
+
+import org.apache.kafka.common.errors.UnknownServerException;
+import org.apache.kafka.common.message.UpdateFeaturesRequestData;
+import org.apache.kafka.common.protocol.Errors;
+import org.junit.Test;
+
+import java.util.Collections;
+
+import static org.junit.Assert.assertEquals;
+
+public class UpdateFeaturesRequestTest {
+
+    @Test
+    public void testGetErrorResponse() {
+        UpdateFeaturesRequestData.FeatureUpdateKeyCollection features =
+            new UpdateFeaturesRequestData.FeatureUpdateKeyCollection();
+
+        features.add(new UpdateFeaturesRequestData.FeatureUpdateKey()
+            .setFeature("foo")
+            .setMaxVersionLevel((short) 2)
+        );
+
+        features.add(new UpdateFeaturesRequestData.FeatureUpdateKey()
+            .setFeature("bar")
+            .setMaxVersionLevel((short) 3)
+        );
+
+        UpdateFeaturesRequest request = new UpdateFeaturesRequest(
+            new UpdateFeaturesRequestData().setFeatureUpdates(features),
+            UpdateFeaturesRequestData.HIGHEST_SUPPORTED_VERSION
+        );
+
+        UpdateFeaturesResponse response = request.getErrorResponse(0, new 
UnknownServerException());
+        assertEquals(Errors.UNKNOWN_SERVER_ERROR, 
response.topLevelError().error());
+        assertEquals(0, response.data().results().size());
+        assertEquals(Collections.singletonMap(Errors.UNKNOWN_SERVER_ERROR, 1), 
response.errorCounts());
+    }
+
+}
diff --git 
a/clients/src/test/java/org/apache/kafka/common/requests/UpdateFeaturesResponseTest.java
 
b/clients/src/test/java/org/apache/kafka/common/requests/UpdateFeaturesResponseTest.java
new file mode 100644
index 0000000..54b64f9
--- /dev/null
+++ 
b/clients/src/test/java/org/apache/kafka/common/requests/UpdateFeaturesResponseTest.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.requests;
+
+import org.apache.kafka.common.message.UpdateFeaturesResponseData;
+import org.apache.kafka.common.protocol.Errors;
+import org.junit.Test;
+
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
+
+public class UpdateFeaturesResponseTest {
+
+    @Test
+    public void testErrorCounts() {
+        UpdateFeaturesResponseData.UpdatableFeatureResultCollection results =
+            new UpdateFeaturesResponseData.UpdatableFeatureResultCollection();
+
+        results.add(new UpdateFeaturesResponseData.UpdatableFeatureResult()
+            .setFeature("foo")
+            .setErrorCode(Errors.UNKNOWN_SERVER_ERROR.code())
+        );
+
+        results.add(new UpdateFeaturesResponseData.UpdatableFeatureResult()
+            .setFeature("bar")
+            .setErrorCode(Errors.UNKNOWN_SERVER_ERROR.code())
+        );
+
+        results.add(new UpdateFeaturesResponseData.UpdatableFeatureResult()
+            .setFeature("baz")
+            .setErrorCode(Errors.FEATURE_UPDATE_FAILED.code())
+        );
+
+        UpdateFeaturesResponse response = new UpdateFeaturesResponse(new 
UpdateFeaturesResponseData()
+            .setErrorCode(Errors.INVALID_REQUEST.code())
+            .setResults(results)
+        );
+
+        Map<Errors, Integer> errorCounts = response.errorCounts();
+        assertEquals(3, errorCounts.size());
+        assertEquals(1, errorCounts.get(Errors.INVALID_REQUEST).intValue());
+        assertEquals(2, 
errorCounts.get(Errors.UNKNOWN_SERVER_ERROR).intValue());
+        assertEquals(1, 
errorCounts.get(Errors.FEATURE_UPDATE_FAILED).intValue());
+    }
+
+}

Reply via email to