This is an automated email from the ASF dual-hosted git repository. schofielaj pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push: new ac583ad2c02 KAFKA-19455: Retry persister request on metadata image issues. (#20078) ac583ad2c02 is described below commit ac583ad2c02e47c33d5614248b28718ed09bf6e4 Author: Sushant Mahajan <smaha...@confluent.io> AuthorDate: Wed Jul 2 00:17:59 2025 +0530 KAFKA-19455: Retry persister request on metadata image issues. (#20078) * If we get an `UNKNOWN_TOPIC_OR_PARTITION` response from the `ShareCoordinator` is could imply a transient issue where the metadata image is not upto date. * In this case it makes sense to retry the request to give time for data to be available. * In this PR, we are making the required change. Reviewers: Andrew Schofield <aschofi...@confluent.io> --- .../apache/kafka/server/share/persister/PersisterStateManager.java | 6 ++++++ .../org/apache/kafka/coordinator/share/ShareCoordinatorShard.java | 1 + 2 files changed, 7 insertions(+) diff --git a/server-common/src/main/java/org/apache/kafka/server/share/persister/PersisterStateManager.java b/server-common/src/main/java/org/apache/kafka/server/share/persister/PersisterStateManager.java index 98b08a3e50a..ccfae329c7b 100644 --- a/server-common/src/main/java/org/apache/kafka/server/share/persister/PersisterStateManager.java +++ b/server-common/src/main/java/org/apache/kafka/server/share/persister/PersisterStateManager.java @@ -454,6 +454,7 @@ public class PersisterStateManager { case COORDINATOR_NOT_AVAILABLE: // retriable error codes case COORDINATOR_LOAD_IN_PROGRESS: case NOT_COORDINATOR: + case UNKNOWN_TOPIC_OR_PARTITION: log.debug("Received retriable error in find coordinator for {} using key {}: {}", name(), partitionKey(), error.message()); if (!findCoordBackoff.canAttempt()) { log.error("Exhausted max retries to find coordinator for {} using key {} without success.", name(), partitionKey()); @@ -581,6 +582,7 @@ public class PersisterStateManager { case COORDINATOR_NOT_AVAILABLE: case COORDINATOR_LOAD_IN_PROGRESS: case NOT_COORDINATOR: + case UNKNOWN_TOPIC_OR_PARTITION: log.debug("Received retriable error in initialize state RPC for key {}: {}", partitionKey(), error.message()); if (!initializeStateBackoff.canAttempt()) { log.error("Exhausted max retries for initialize state RPC for key {} without success.", partitionKey()); @@ -739,6 +741,7 @@ public class PersisterStateManager { case COORDINATOR_NOT_AVAILABLE: case COORDINATOR_LOAD_IN_PROGRESS: case NOT_COORDINATOR: + case UNKNOWN_TOPIC_OR_PARTITION: log.debug("Received retriable error in write state RPC for key {}: {}", partitionKey(), error.message()); if (!writeStateBackoff.canAttempt()) { log.error("Exhausted max retries for write state RPC for key {} without success.", partitionKey()); @@ -881,6 +884,7 @@ public class PersisterStateManager { case COORDINATOR_NOT_AVAILABLE: case COORDINATOR_LOAD_IN_PROGRESS: case NOT_COORDINATOR: + case UNKNOWN_TOPIC_OR_PARTITION: log.debug("Received retriable error in read state RPC for key {}: {}", partitionKey(), error.message()); if (!readStateBackoff.canAttempt()) { log.error("Exhausted max retries for read state RPC for key {} without success.", partitionKey()); @@ -1023,6 +1027,7 @@ public class PersisterStateManager { case COORDINATOR_NOT_AVAILABLE: case COORDINATOR_LOAD_IN_PROGRESS: case NOT_COORDINATOR: + case UNKNOWN_TOPIC_OR_PARTITION: log.debug("Received retriable error in read state summary RPC for key {}: {}", partitionKey(), error.message()); if (!readStateSummaryBackoff.canAttempt()) { log.error("Exhausted max retries for read state summary RPC for key {} without success.", partitionKey()); @@ -1162,6 +1167,7 @@ public class PersisterStateManager { case COORDINATOR_NOT_AVAILABLE: case COORDINATOR_LOAD_IN_PROGRESS: case NOT_COORDINATOR: + case UNKNOWN_TOPIC_OR_PARTITION: log.debug("Received retriable error in delete state RPC for key {}: {}", partitionKey(), error.message()); if (!deleteStateBackoff.canAttempt()) { log.error("Exhausted max retries for delete state RPC for key {} without success.", partitionKey()); diff --git a/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorShard.java b/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorShard.java index f5d9a3cfb30..86b2d506376 100644 --- a/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorShard.java +++ b/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorShard.java @@ -207,6 +207,7 @@ public class ShareCoordinatorShard implements CoordinatorShard<CoordinatorRecord @Override public void onLoaded(MetadataImage newImage) { + this.metadataImage = newImage; coordinatorMetrics.activateMetricsShard(metricsShard); }