This is an automated email from the ASF dual-hosted git repository.

mittal pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 6fac2d07002 MINOR: Prevent error message swallow in persister. (#20238)
6fac2d07002 is described below

commit 6fac2d07002f99bc4f6d9fc590acb04f65a6b2de
Author: Sushant Mahajan <smaha...@confluent.io>
AuthorDate: Thu Jul 24 19:29:43 2025 +0530

    MINOR: Prevent error message swallow in persister. (#20238)
    
    * The persister receives the response from the share coordinator and
    performs various actions based on the response error code.
    * There was an issue in handling the error code and error message. We
    were creating an Error object from the error code in the response but
    not looking at the error message in the response.
    * In some cases where a standard code has a custom associated message -
    this will not provide complete information in the logs.
    * In this PR, we rectify the situation by first checking the error
    message in response and if empty, getting the standard message from
    Errors object.
    
    Reviewers: Andrew Schofield <aschofi...@confluent.io>, Apoorv Mittal
    <apoorvmitta...@gmail.com>
---
 .../share/persister/PersisterStateManager.java     | 63 ++++++++++++++++------
 1 file changed, 46 insertions(+), 17 deletions(-)

diff --git 
a/server-common/src/main/java/org/apache/kafka/server/share/persister/PersisterStateManager.java
 
b/server-common/src/main/java/org/apache/kafka/server/share/persister/PersisterStateManager.java
index 4569455aee1..6189e8b9b13 100644
--- 
a/server-common/src/main/java/org/apache/kafka/server/share/persister/PersisterStateManager.java
+++ 
b/server-common/src/main/java/org/apache/kafka/server/share/persister/PersisterStateManager.java
@@ -442,6 +442,10 @@ public class PersisterStateManager {
 
             FindCoordinatorResponseData.Coordinator coordinatorData = 
coordinators.get(0);
             Errors error = Errors.forCode(coordinatorData.errorCode());
+            String errorMessage = coordinatorData.errorMessage();
+            if (errorMessage == null || errorMessage.isEmpty()) {
+                errorMessage = error.message();
+            }
 
             switch (error) {
                 case NONE:
@@ -460,7 +464,7 @@ public class PersisterStateManager {
                 case COORDINATOR_LOAD_IN_PROGRESS:
                 case NOT_COORDINATOR:
                 case UNKNOWN_TOPIC_OR_PARTITION:
-                    log.debug("Received retriable error in find coordinator 
for {} using key {}: {}", name(), partitionKey(), error.message());
+                    log.debug("Received retriable error in find coordinator 
for {} using key {}: {}", name(), partitionKey(), errorMessage);
                     if (!findCoordBackoff.canAttempt()) {
                         log.error("Exhausted max retries to find coordinator 
for {} using key {} without success.", name(), partitionKey());
                         findCoordinatorErrorResponse(error, new 
Exception("Exhausted max retries to find coordinator without success."));
@@ -472,7 +476,7 @@ public class PersisterStateManager {
 
                 default:
                     log.error("Unable to find coordinator for {} using key 
{}.", name(), partitionKey());
-                    findCoordinatorErrorResponse(error, null);
+                    findCoordinatorErrorResponse(error, new 
Exception(errorMessage));
             }
         }
 
@@ -572,6 +576,11 @@ public class PersisterStateManager {
 
                     if (partitionStateData.isPresent()) {
                         Errors error = 
Errors.forCode(partitionStateData.get().errorCode());
+                        String errorMessage = 
partitionStateData.get().errorMessage();
+                        if (errorMessage == null || errorMessage.isEmpty()) {
+                            errorMessage = error.message();
+                        }
+
                         switch (error) {
                             case NONE:
                                 initializeStateBackoff.resetAttempts();
@@ -588,7 +597,7 @@ public class PersisterStateManager {
                             case COORDINATOR_LOAD_IN_PROGRESS:
                             case NOT_COORDINATOR:
                             case UNKNOWN_TOPIC_OR_PARTITION:
-                                log.debug("Received retriable error in 
initialize state RPC for key {}: {}", partitionKey(), error.message());
+                                log.debug("Received retriable error in 
initialize state RPC for key {}: {}", partitionKey(), errorMessage);
                                 if (!initializeStateBackoff.canAttempt()) {
                                     log.error("Exhausted max retries for 
initialize state RPC for key {} without success.", partitionKey());
                                     requestErrorResponse(error, new 
Exception("Exhausted max retries to complete initialize state RPC without 
success."));
@@ -599,8 +608,8 @@ public class PersisterStateManager {
                                 return;
 
                             default:
-                                log.error("Unable to perform initialize state 
RPC for key {}: {}", partitionKey(), error.message());
-                                requestErrorResponse(error, null);
+                                log.error("Unable to perform initialize state 
RPC for key {}: {}", partitionKey(), errorMessage);
+                                requestErrorResponse(error, new 
Exception(errorMessage));
                                 return;
                         }
                     }
@@ -731,6 +740,11 @@ public class PersisterStateManager {
 
                     if (partitionStateData.isPresent()) {
                         Errors error = 
Errors.forCode(partitionStateData.get().errorCode());
+                        String errorMessage = 
partitionStateData.get().errorMessage();
+                        if (errorMessage == null || errorMessage.isEmpty()) {
+                            errorMessage = error.message();
+                        }
+
                         switch (error) {
                             case NONE:
                                 writeStateBackoff.resetAttempts();
@@ -747,7 +761,7 @@ public class PersisterStateManager {
                             case COORDINATOR_LOAD_IN_PROGRESS:
                             case NOT_COORDINATOR:
                             case UNKNOWN_TOPIC_OR_PARTITION:
-                                log.debug("Received retriable error in write 
state RPC for key {}: {}", partitionKey(), error.message());
+                                log.debug("Received retriable error in write 
state RPC for key {}: {}", partitionKey(), errorMessage);
                                 if (!writeStateBackoff.canAttempt()) {
                                     log.error("Exhausted max retries for write 
state RPC for key {} without success.", partitionKey());
                                     requestErrorResponse(error, new 
Exception("Exhausted max retries to complete write state RPC without 
success."));
@@ -758,8 +772,8 @@ public class PersisterStateManager {
                                 return;
 
                             default:
-                                log.error("Unable to perform write state RPC 
for key {}: {}", partitionKey(), error.message());
-                                requestErrorResponse(error, null);
+                                log.error("Unable to perform write state RPC 
for key {}: {}", partitionKey(), errorMessage);
+                                requestErrorResponse(error, new 
Exception(errorMessage));
                                 return;
                         }
                     }
@@ -874,6 +888,11 @@ public class PersisterStateManager {
 
                     if (partitionStateData.isPresent()) {
                         Errors error = 
Errors.forCode(partitionStateData.get().errorCode());
+                        String errorMessage = 
partitionStateData.get().errorMessage();
+                        if (errorMessage == null || errorMessage.isEmpty()) {
+                            errorMessage = error.message();
+                        }
+
                         switch (error) {
                             case NONE:
                                 readStateBackoff.resetAttempts();
@@ -890,7 +909,7 @@ public class PersisterStateManager {
                             case COORDINATOR_LOAD_IN_PROGRESS:
                             case NOT_COORDINATOR:
                             case UNKNOWN_TOPIC_OR_PARTITION:
-                                log.debug("Received retriable error in read 
state RPC for key {}: {}", partitionKey(), error.message());
+                                log.debug("Received retriable error in read 
state RPC for key {}: {}", partitionKey(), errorMessage);
                                 if (!readStateBackoff.canAttempt()) {
                                     log.error("Exhausted max retries for read 
state RPC for key {} without success.", partitionKey());
                                     requestErrorResponse(error, new 
Exception("Exhausted max retries to complete read state RPC without success."));
@@ -901,8 +920,8 @@ public class PersisterStateManager {
                                 return;
 
                             default:
-                                log.error("Unable to perform read state RPC 
for key {}: {}", partitionKey(), error.message());
-                                requestErrorResponse(error, null);
+                                log.error("Unable to perform read state RPC 
for key {}: {}", partitionKey(), errorMessage);
+                                requestErrorResponse(error, new 
Exception(errorMessage));
                                 return;
                         }
                     }
@@ -1017,6 +1036,11 @@ public class PersisterStateManager {
 
                     if (partitionStateData.isPresent()) {
                         Errors error = 
Errors.forCode(partitionStateData.get().errorCode());
+                        String errorMessage = 
partitionStateData.get().errorMessage();
+                        if (errorMessage == null || errorMessage.isEmpty()) {
+                            errorMessage = error.message();
+                        }
+
                         switch (error) {
                             case NONE:
                                 readStateSummaryBackoff.resetAttempts();
@@ -1033,7 +1057,7 @@ public class PersisterStateManager {
                             case COORDINATOR_LOAD_IN_PROGRESS:
                             case NOT_COORDINATOR:
                             case UNKNOWN_TOPIC_OR_PARTITION:
-                                log.debug("Received retriable error in read 
state summary RPC for key {}: {}", partitionKey(), error.message());
+                                log.debug("Received retriable error in read 
state summary RPC for key {}: {}", partitionKey(), errorMessage);
                                 if (!readStateSummaryBackoff.canAttempt()) {
                                     log.error("Exhausted max retries for read 
state summary RPC for key {} without success.", partitionKey());
                                     requestErrorResponse(error, new 
Exception("Exhausted max retries to complete read state summary RPC without 
success."));
@@ -1044,8 +1068,8 @@ public class PersisterStateManager {
                                 return;
 
                             default:
-                                log.error("Unable to perform read state 
summary RPC for key {}: {}", partitionKey(), error.message());
-                                requestErrorResponse(error, null);
+                                log.error("Unable to perform read state 
summary RPC for key {}: {}", partitionKey(), errorMessage);
+                                requestErrorResponse(error, new 
Exception(errorMessage));
                                 return;
                         }
                     }
@@ -1157,6 +1181,11 @@ public class PersisterStateManager {
 
                     if (partitionStateData.isPresent()) {
                         Errors error = 
Errors.forCode(partitionStateData.get().errorCode());
+                        String errorMessage = 
partitionStateData.get().errorMessage();
+                        if (errorMessage == null || errorMessage.isEmpty()) {
+                            errorMessage = error.message();
+                        }
+
                         switch (error) {
                             case NONE:
                                 deleteStateBackoff.resetAttempts();
@@ -1173,7 +1202,7 @@ public class PersisterStateManager {
                             case COORDINATOR_LOAD_IN_PROGRESS:
                             case NOT_COORDINATOR:
                             case UNKNOWN_TOPIC_OR_PARTITION:
-                                log.debug("Received retriable error in delete 
state RPC for key {}: {}", partitionKey(), error.message());
+                                log.debug("Received retriable error in delete 
state RPC for key {}: {}", partitionKey(), errorMessage);
                                 if (!deleteStateBackoff.canAttempt()) {
                                     log.error("Exhausted max retries for 
delete state RPC for key {} without success.", partitionKey());
                                     requestErrorResponse(error, new 
Exception("Exhausted max retries to complete delete state RPC without 
success."));
@@ -1184,8 +1213,8 @@ public class PersisterStateManager {
                                 return;
 
                             default:
-                                log.error("Unable to perform delete state RPC 
for key {}: {}", partitionKey(), error.message());
-                                requestErrorResponse(error, null);
+                                log.error("Unable to perform delete state RPC 
for key {}: {}", partitionKey(), errorMessage);
+                                requestErrorResponse(error, new 
Exception(errorMessage));
                                 return;
                         }
                     }

Reply via email to