This is an automated email from the ASF dual-hosted git repository.
chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 7bff678699a KAFKA-18859 honor the error message of
UnregisterBrokerResponse (#19027)
7bff678699a is described below
commit 7bff678699ae5f69c18e2f86a43acff0d160a606
Author: Ken Huang <[email protected]>
AuthorDate: Sun Mar 16 03:06:01 2025 +0800
KAFKA-18859 honor the error message of UnregisterBrokerResponse (#19027)
Reviewers: Ismael Juma <[email protected]>, TengYao Chi
<[email protected]>, Chia-Ping Tsai <[email protected]>
---
.../kafka/clients/admin/KafkaAdminClient.java | 6 ++---
.../common/requests/UnregisterBrokerRequest.java | 3 ++-
.../kafka/common/requests/RequestResponseTest.java | 27 ++++++++++++++++++----
.../main/scala/kafka/server/ControllerApis.scala | 4 +---
4 files changed, 29 insertions(+), 11 deletions(-)
diff --git
a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
index f6f484ca7bf..c4dcceae63c 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
@@ -4794,11 +4794,11 @@ public class KafkaAdminClient extends AdminClient {
future.complete(null);
break;
case REQUEST_TIMED_OUT:
- throw error.exception();
+ throw error.exception(response.data().errorMessage());
default:
log.error("Unregister broker request for broker ID {}
failed: {}",
- brokerId, error.message());
- future.completeExceptionally(error.exception());
+ brokerId, response.data().errorMessage());
+
future.completeExceptionally(error.exception(response.data().errorMessage()));
break;
}
}
diff --git
a/clients/src/main/java/org/apache/kafka/common/requests/UnregisterBrokerRequest.java
b/clients/src/main/java/org/apache/kafka/common/requests/UnregisterBrokerRequest.java
index 253499f85af..d0cbf715ddb 100644
---
a/clients/src/main/java/org/apache/kafka/common/requests/UnregisterBrokerRequest.java
+++
b/clients/src/main/java/org/apache/kafka/common/requests/UnregisterBrokerRequest.java
@@ -57,7 +57,8 @@ public class UnregisterBrokerRequest extends AbstractRequest {
Errors error = Errors.forException(e);
return new UnregisterBrokerResponse(new UnregisterBrokerResponseData()
.setThrottleTimeMs(throttleTimeMs)
- .setErrorCode(error.code()));
+ .setErrorCode(error.code())
+ .setErrorMessage(e.getMessage()));
}
public static UnregisterBrokerRequest parse(ByteBuffer buffer, short
version) {
diff --git
a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
index 1a5050907c0..cca885f591c 100644
---
a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
+++
b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
@@ -315,7 +315,7 @@ import static
org.apache.kafka.common.protocol.ApiKeys.OFFSET_FETCH;
import static org.apache.kafka.common.protocol.ApiKeys.PRODUCE;
import static org.apache.kafka.common.protocol.ApiKeys.SASL_AUTHENTICATE;
import static org.apache.kafka.common.protocol.ApiKeys.SYNC_GROUP;
-import static org.apache.kafka.common.protocol.ApiKeys.WRITE_TXN_MARKERS;
+import static org.apache.kafka.common.protocol.ApiKeys.UNREGISTER_BROKER;
import static
org.apache.kafka.common.requests.EndTxnRequest.LAST_STABLE_VERSION_BEFORE_TRANSACTION_V2;
import static
org.apache.kafka.common.requests.FetchMetadata.INVALID_SESSION_ID;
import static org.junit.jupiter.api.Assertions.assertEquals;
@@ -336,11 +336,13 @@ public class RequestResponseTest {
public void testSerialization() {
Map<ApiKeys, List<Short>> toSkip = new HashMap<>();
// It's not possible to create a MetadataRequest v0 via the builder
- toSkip.put(METADATA, singletonList((short) 0));
+ toSkip.put(METADATA, List.of((short) 0));
// DescribeLogDirsResponse v0, v1 and v2 don't have a top level error
field
- toSkip.put(DESCRIBE_LOG_DIRS, Arrays.asList((short) 0, (short) 1,
(short) 2));
+ toSkip.put(DESCRIBE_LOG_DIRS, List.of((short) 0, (short) 1, (short)
2));
// ElectLeaders v0 does not have a top level error field, when
accessing it, it defaults to NONE
- toSkip.put(ELECT_LEADERS, singletonList((short) 0));
+ toSkip.put(ELECT_LEADERS, List.of((short) 0));
+ // UnregisterBroker v0 contains the error message in the response
+ toSkip.put(UNREGISTER_BROKER, List.of((short) 0));
for (ApiKeys apikey : ApiKeys.values()) {
for (short version : apikey.allVersions()) {
@@ -838,6 +840,23 @@ public class RequestResponseTest {
}
}
+ @Test
+ public void testUnregisterBrokerResponseWithUnknownServerError() {
+ UnregisterBrokerRequest request = new UnregisterBrokerRequest.Builder(
+ new UnregisterBrokerRequestData()
+ ).build((short) 0);
+ String customerErrorMessage = "customer error message";
+
+ UnregisterBrokerResponse response = request.getErrorResponse(
+ 0,
+ new RuntimeException(customerErrorMessage)
+ );
+
+ assertEquals(0, response.throttleTimeMs());
+ assertEquals(Errors.UNKNOWN_SERVER_ERROR.code(),
response.data().errorCode());
+ assertEquals(customerErrorMessage, response.data().errorMessage());
+ }
+
private ApiVersionsResponse defaultApiVersionsResponse() {
return
TestUtils.defaultApiVersionsResponse(ApiMessageType.ListenerType.BROKER);
}
diff --git a/core/src/main/scala/kafka/server/ControllerApis.scala
b/core/src/main/scala/kafka/server/ControllerApis.scala
index 1e343e776d5..0b3c1ea1dac 100644
--- a/core/src/main/scala/kafka/server/ControllerApis.scala
+++ b/core/src/main/scala/kafka/server/ControllerApis.scala
@@ -642,9 +642,7 @@ class ControllerApis(
def createResponseCallback(requestThrottleMs: Int,
e: Throwable): UnregisterBrokerResponse = {
if (e != null) {
- new UnregisterBrokerResponse(new UnregisterBrokerResponseData().
- setThrottleTimeMs(requestThrottleMs).
- setErrorCode(Errors.forException(e).code))
+ decommissionRequest.getErrorResponse(requestThrottleMs, e)
} else {
new UnregisterBrokerResponse(new UnregisterBrokerResponseData().
setThrottleTimeMs(requestThrottleMs))