This is an automated email from the ASF dual-hosted git repository.

schofielaj pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new ac583ad2c02 KAFKA-19455: Retry persister request on metadata image 
issues. (#20078)
ac583ad2c02 is described below

commit ac583ad2c02e47c33d5614248b28718ed09bf6e4
Author: Sushant Mahajan <smaha...@confluent.io>
AuthorDate: Wed Jul 2 00:17:59 2025 +0530

    KAFKA-19455: Retry persister request on metadata image issues. (#20078)
    
    * If we get an `UNKNOWN_TOPIC_OR_PARTITION` response from the
    `ShareCoordinator` is could imply a transient issue where the metadata
    image is not upto date.
    * In this case it makes sense to retry the request to give time for data
    to be available.
    * In this PR, we are making the required change.
    
    Reviewers: Andrew Schofield <aschofi...@confluent.io>
---
 .../apache/kafka/server/share/persister/PersisterStateManager.java  | 6 ++++++
 .../org/apache/kafka/coordinator/share/ShareCoordinatorShard.java   | 1 +
 2 files changed, 7 insertions(+)

diff --git 
a/server-common/src/main/java/org/apache/kafka/server/share/persister/PersisterStateManager.java
 
b/server-common/src/main/java/org/apache/kafka/server/share/persister/PersisterStateManager.java
index 98b08a3e50a..ccfae329c7b 100644
--- 
a/server-common/src/main/java/org/apache/kafka/server/share/persister/PersisterStateManager.java
+++ 
b/server-common/src/main/java/org/apache/kafka/server/share/persister/PersisterStateManager.java
@@ -454,6 +454,7 @@ public class PersisterStateManager {
                 case COORDINATOR_NOT_AVAILABLE: // retriable error codes
                 case COORDINATOR_LOAD_IN_PROGRESS:
                 case NOT_COORDINATOR:
+                case UNKNOWN_TOPIC_OR_PARTITION:
                     log.debug("Received retriable error in find coordinator 
for {} using key {}: {}", name(), partitionKey(), error.message());
                     if (!findCoordBackoff.canAttempt()) {
                         log.error("Exhausted max retries to find coordinator 
for {} using key {} without success.", name(), partitionKey());
@@ -581,6 +582,7 @@ public class PersisterStateManager {
                             case COORDINATOR_NOT_AVAILABLE:
                             case COORDINATOR_LOAD_IN_PROGRESS:
                             case NOT_COORDINATOR:
+                            case UNKNOWN_TOPIC_OR_PARTITION:
                                 log.debug("Received retriable error in 
initialize state RPC for key {}: {}", partitionKey(), error.message());
                                 if (!initializeStateBackoff.canAttempt()) {
                                     log.error("Exhausted max retries for 
initialize state RPC for key {} without success.", partitionKey());
@@ -739,6 +741,7 @@ public class PersisterStateManager {
                             case COORDINATOR_NOT_AVAILABLE:
                             case COORDINATOR_LOAD_IN_PROGRESS:
                             case NOT_COORDINATOR:
+                            case UNKNOWN_TOPIC_OR_PARTITION:
                                 log.debug("Received retriable error in write 
state RPC for key {}: {}", partitionKey(), error.message());
                                 if (!writeStateBackoff.canAttempt()) {
                                     log.error("Exhausted max retries for write 
state RPC for key {} without success.", partitionKey());
@@ -881,6 +884,7 @@ public class PersisterStateManager {
                             case COORDINATOR_NOT_AVAILABLE:
                             case COORDINATOR_LOAD_IN_PROGRESS:
                             case NOT_COORDINATOR:
+                            case UNKNOWN_TOPIC_OR_PARTITION:
                                 log.debug("Received retriable error in read 
state RPC for key {}: {}", partitionKey(), error.message());
                                 if (!readStateBackoff.canAttempt()) {
                                     log.error("Exhausted max retries for read 
state RPC for key {} without success.", partitionKey());
@@ -1023,6 +1027,7 @@ public class PersisterStateManager {
                             case COORDINATOR_NOT_AVAILABLE:
                             case COORDINATOR_LOAD_IN_PROGRESS:
                             case NOT_COORDINATOR:
+                            case UNKNOWN_TOPIC_OR_PARTITION:
                                 log.debug("Received retriable error in read 
state summary RPC for key {}: {}", partitionKey(), error.message());
                                 if (!readStateSummaryBackoff.canAttempt()) {
                                     log.error("Exhausted max retries for read 
state summary RPC for key {} without success.", partitionKey());
@@ -1162,6 +1167,7 @@ public class PersisterStateManager {
                             case COORDINATOR_NOT_AVAILABLE:
                             case COORDINATOR_LOAD_IN_PROGRESS:
                             case NOT_COORDINATOR:
+                            case UNKNOWN_TOPIC_OR_PARTITION:
                                 log.debug("Received retriable error in delete 
state RPC for key {}: {}", partitionKey(), error.message());
                                 if (!deleteStateBackoff.canAttempt()) {
                                     log.error("Exhausted max retries for 
delete state RPC for key {} without success.", partitionKey());
diff --git 
a/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorShard.java
 
b/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorShard.java
index f5d9a3cfb30..86b2d506376 100644
--- 
a/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorShard.java
+++ 
b/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorShard.java
@@ -207,6 +207,7 @@ public class ShareCoordinatorShard implements 
CoordinatorShard<CoordinatorRecord
 
     @Override
     public void onLoaded(MetadataImage newImage) {
+        this.metadataImage = newImage;
         coordinatorMetrics.activateMetricsShard(metricsShard);
     }
 

Reply via email to