This is an automated email from the ASF dual-hosted git repository. mittal pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push: new 6fac2d07002 MINOR: Prevent error message swallow in persister. (#20238) 6fac2d07002 is described below commit 6fac2d07002f99bc4f6d9fc590acb04f65a6b2de Author: Sushant Mahajan <smaha...@confluent.io> AuthorDate: Thu Jul 24 19:29:43 2025 +0530 MINOR: Prevent error message swallow in persister. (#20238) * The persister receives the response from the share coordinator and performs various actions based on the response error code. * There was an issue in handling the error code and error message. We were creating an Error object from the error code in the response but not looking at the error message in the response. * In some cases where a standard code has a custom associated message - this will not provide complete information in the logs. * In this PR, we rectify the situation by first checking the error message in response and if empty, getting the standard message from Errors object. Reviewers: Andrew Schofield <aschofi...@confluent.io>, Apoorv Mittal <apoorvmitta...@gmail.com> --- .../share/persister/PersisterStateManager.java | 63 ++++++++++++++++------ 1 file changed, 46 insertions(+), 17 deletions(-) diff --git a/server-common/src/main/java/org/apache/kafka/server/share/persister/PersisterStateManager.java b/server-common/src/main/java/org/apache/kafka/server/share/persister/PersisterStateManager.java index 4569455aee1..6189e8b9b13 100644 --- a/server-common/src/main/java/org/apache/kafka/server/share/persister/PersisterStateManager.java +++ b/server-common/src/main/java/org/apache/kafka/server/share/persister/PersisterStateManager.java @@ -442,6 +442,10 @@ public class PersisterStateManager { FindCoordinatorResponseData.Coordinator coordinatorData = coordinators.get(0); Errors error = Errors.forCode(coordinatorData.errorCode()); + String errorMessage = coordinatorData.errorMessage(); + if (errorMessage == null || errorMessage.isEmpty()) { + errorMessage = error.message(); + } switch (error) { case NONE: @@ -460,7 +464,7 @@ public class PersisterStateManager { case COORDINATOR_LOAD_IN_PROGRESS: case NOT_COORDINATOR: case UNKNOWN_TOPIC_OR_PARTITION: - log.debug("Received retriable error in find coordinator for {} using key {}: {}", name(), partitionKey(), error.message()); + log.debug("Received retriable error in find coordinator for {} using key {}: {}", name(), partitionKey(), errorMessage); if (!findCoordBackoff.canAttempt()) { log.error("Exhausted max retries to find coordinator for {} using key {} without success.", name(), partitionKey()); findCoordinatorErrorResponse(error, new Exception("Exhausted max retries to find coordinator without success.")); @@ -472,7 +476,7 @@ public class PersisterStateManager { default: log.error("Unable to find coordinator for {} using key {}.", name(), partitionKey()); - findCoordinatorErrorResponse(error, null); + findCoordinatorErrorResponse(error, new Exception(errorMessage)); } } @@ -572,6 +576,11 @@ public class PersisterStateManager { if (partitionStateData.isPresent()) { Errors error = Errors.forCode(partitionStateData.get().errorCode()); + String errorMessage = partitionStateData.get().errorMessage(); + if (errorMessage == null || errorMessage.isEmpty()) { + errorMessage = error.message(); + } + switch (error) { case NONE: initializeStateBackoff.resetAttempts(); @@ -588,7 +597,7 @@ public class PersisterStateManager { case COORDINATOR_LOAD_IN_PROGRESS: case NOT_COORDINATOR: case UNKNOWN_TOPIC_OR_PARTITION: - log.debug("Received retriable error in initialize state RPC for key {}: {}", partitionKey(), error.message()); + log.debug("Received retriable error in initialize state RPC for key {}: {}", partitionKey(), errorMessage); if (!initializeStateBackoff.canAttempt()) { log.error("Exhausted max retries for initialize state RPC for key {} without success.", partitionKey()); requestErrorResponse(error, new Exception("Exhausted max retries to complete initialize state RPC without success.")); @@ -599,8 +608,8 @@ public class PersisterStateManager { return; default: - log.error("Unable to perform initialize state RPC for key {}: {}", partitionKey(), error.message()); - requestErrorResponse(error, null); + log.error("Unable to perform initialize state RPC for key {}: {}", partitionKey(), errorMessage); + requestErrorResponse(error, new Exception(errorMessage)); return; } } @@ -731,6 +740,11 @@ public class PersisterStateManager { if (partitionStateData.isPresent()) { Errors error = Errors.forCode(partitionStateData.get().errorCode()); + String errorMessage = partitionStateData.get().errorMessage(); + if (errorMessage == null || errorMessage.isEmpty()) { + errorMessage = error.message(); + } + switch (error) { case NONE: writeStateBackoff.resetAttempts(); @@ -747,7 +761,7 @@ public class PersisterStateManager { case COORDINATOR_LOAD_IN_PROGRESS: case NOT_COORDINATOR: case UNKNOWN_TOPIC_OR_PARTITION: - log.debug("Received retriable error in write state RPC for key {}: {}", partitionKey(), error.message()); + log.debug("Received retriable error in write state RPC for key {}: {}", partitionKey(), errorMessage); if (!writeStateBackoff.canAttempt()) { log.error("Exhausted max retries for write state RPC for key {} without success.", partitionKey()); requestErrorResponse(error, new Exception("Exhausted max retries to complete write state RPC without success.")); @@ -758,8 +772,8 @@ public class PersisterStateManager { return; default: - log.error("Unable to perform write state RPC for key {}: {}", partitionKey(), error.message()); - requestErrorResponse(error, null); + log.error("Unable to perform write state RPC for key {}: {}", partitionKey(), errorMessage); + requestErrorResponse(error, new Exception(errorMessage)); return; } } @@ -874,6 +888,11 @@ public class PersisterStateManager { if (partitionStateData.isPresent()) { Errors error = Errors.forCode(partitionStateData.get().errorCode()); + String errorMessage = partitionStateData.get().errorMessage(); + if (errorMessage == null || errorMessage.isEmpty()) { + errorMessage = error.message(); + } + switch (error) { case NONE: readStateBackoff.resetAttempts(); @@ -890,7 +909,7 @@ public class PersisterStateManager { case COORDINATOR_LOAD_IN_PROGRESS: case NOT_COORDINATOR: case UNKNOWN_TOPIC_OR_PARTITION: - log.debug("Received retriable error in read state RPC for key {}: {}", partitionKey(), error.message()); + log.debug("Received retriable error in read state RPC for key {}: {}", partitionKey(), errorMessage); if (!readStateBackoff.canAttempt()) { log.error("Exhausted max retries for read state RPC for key {} without success.", partitionKey()); requestErrorResponse(error, new Exception("Exhausted max retries to complete read state RPC without success.")); @@ -901,8 +920,8 @@ public class PersisterStateManager { return; default: - log.error("Unable to perform read state RPC for key {}: {}", partitionKey(), error.message()); - requestErrorResponse(error, null); + log.error("Unable to perform read state RPC for key {}: {}", partitionKey(), errorMessage); + requestErrorResponse(error, new Exception(errorMessage)); return; } } @@ -1017,6 +1036,11 @@ public class PersisterStateManager { if (partitionStateData.isPresent()) { Errors error = Errors.forCode(partitionStateData.get().errorCode()); + String errorMessage = partitionStateData.get().errorMessage(); + if (errorMessage == null || errorMessage.isEmpty()) { + errorMessage = error.message(); + } + switch (error) { case NONE: readStateSummaryBackoff.resetAttempts(); @@ -1033,7 +1057,7 @@ public class PersisterStateManager { case COORDINATOR_LOAD_IN_PROGRESS: case NOT_COORDINATOR: case UNKNOWN_TOPIC_OR_PARTITION: - log.debug("Received retriable error in read state summary RPC for key {}: {}", partitionKey(), error.message()); + log.debug("Received retriable error in read state summary RPC for key {}: {}", partitionKey(), errorMessage); if (!readStateSummaryBackoff.canAttempt()) { log.error("Exhausted max retries for read state summary RPC for key {} without success.", partitionKey()); requestErrorResponse(error, new Exception("Exhausted max retries to complete read state summary RPC without success.")); @@ -1044,8 +1068,8 @@ public class PersisterStateManager { return; default: - log.error("Unable to perform read state summary RPC for key {}: {}", partitionKey(), error.message()); - requestErrorResponse(error, null); + log.error("Unable to perform read state summary RPC for key {}: {}", partitionKey(), errorMessage); + requestErrorResponse(error, new Exception(errorMessage)); return; } } @@ -1157,6 +1181,11 @@ public class PersisterStateManager { if (partitionStateData.isPresent()) { Errors error = Errors.forCode(partitionStateData.get().errorCode()); + String errorMessage = partitionStateData.get().errorMessage(); + if (errorMessage == null || errorMessage.isEmpty()) { + errorMessage = error.message(); + } + switch (error) { case NONE: deleteStateBackoff.resetAttempts(); @@ -1173,7 +1202,7 @@ public class PersisterStateManager { case COORDINATOR_LOAD_IN_PROGRESS: case NOT_COORDINATOR: case UNKNOWN_TOPIC_OR_PARTITION: - log.debug("Received retriable error in delete state RPC for key {}: {}", partitionKey(), error.message()); + log.debug("Received retriable error in delete state RPC for key {}: {}", partitionKey(), errorMessage); if (!deleteStateBackoff.canAttempt()) { log.error("Exhausted max retries for delete state RPC for key {} without success.", partitionKey()); requestErrorResponse(error, new Exception("Exhausted max retries to complete delete state RPC without success.")); @@ -1184,8 +1213,8 @@ public class PersisterStateManager { return; default: - log.error("Unable to perform delete state RPC for key {}: {}", partitionKey(), error.message()); - requestErrorResponse(error, null); + log.error("Unable to perform delete state RPC for key {}: {}", partitionKey(), errorMessage); + requestErrorResponse(error, new Exception(errorMessage)); return; } }