This is an automated email from the ASF dual-hosted git repository.

schofielaj pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new ed3af721240 KAFKA-19928: Added retry and backoff mechanism in 
NetworkPartitionMetadataClient (#21001)
ed3af721240 is described below

commit ed3af7212405dd5294733ad33d192f12dc885fc2
Author: Chirag Wadhwa <[email protected]>
AuthorDate: Mon Dec 1 02:49:10 2025 +0530

    KAFKA-19928: Added retry and backoff mechanism in 
NetworkPartitionMetadataClient (#21001)
    
    Currently, if a ListOffsets request fails in
    NetworkPartitionMetadataClient for any reason, the corresponding future
    is completed then and there, without any retries. But the NetworkClient
    and InterbrokerSendThread are loaded lazily in the
    NetworkPartitionMetadataClient on the arrival of the first request. But
    when the first request comes, it is immediately enqueued in the
    NetworkClient, before the connection could be established, thereby
    always failing the first request. As a solution to that, this PR
    introduces a retry mechanism with an upper limit on the retry attempts,
    as well as exponential backoff between succesive retries.
    
    Reviewers: Apoorv Mittal <[email protected]>, Andrew Schofield
     <[email protected]>, Sushant Mahajan <[email protected]>
---
 .../common/utils/ExponentialBackoffManager.java    |  56 ++++
 .../utils/ExponentialBackoffManagerTest.java       | 104 +++++++
 .../src/main/scala/kafka/server/BrokerServer.scala |   3 +-
 .../group/NetworkPartitionMetadataClient.java      | 108 +++++++-
 .../group/NetworkPartitionMetadataClientTest.java  | 304 +++++++++++++++++----
 .../share/persister/PersisterStateManager.java     |  90 +++---
 6 files changed, 553 insertions(+), 112 deletions(-)

diff --git 
a/clients/src/main/java/org/apache/kafka/common/utils/ExponentialBackoffManager.java
 
b/clients/src/main/java/org/apache/kafka/common/utils/ExponentialBackoffManager.java
new file mode 100644
index 00000000000..87e730f9b76
--- /dev/null
+++ 
b/clients/src/main/java/org/apache/kafka/common/utils/ExponentialBackoffManager.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.common.utils;
+
+/**
+ * Manages retry attempts and exponential backoff for requests.
+ */
+public class ExponentialBackoffManager {
+    private final int maxAttempts;
+    private int attempts;
+    private final ExponentialBackoff backoff;
+
+    public ExponentialBackoffManager(int maxAttempts, long initialInterval, 
int multiplier, long maxInterval, double jitter) {
+        this.maxAttempts = maxAttempts;
+        this.backoff = new ExponentialBackoff(
+            initialInterval,
+            multiplier,
+            maxInterval,
+            jitter);
+    }
+
+    public void incrementAttempt() {
+        attempts++;
+    }
+
+    public void resetAttempts() {
+        attempts = 0;
+    }
+
+    public boolean canAttempt() {
+        return attempts < maxAttempts;
+    }
+
+    public long backOff() {
+        return this.backoff.backoff(attempts);
+    }
+
+    public int attempts() {
+        return attempts;
+    }
+}
\ No newline at end of file
diff --git 
a/clients/src/test/java/org/apache/kafka/common/utils/ExponentialBackoffManagerTest.java
 
b/clients/src/test/java/org/apache/kafka/common/utils/ExponentialBackoffManagerTest.java
new file mode 100644
index 00000000000..87fe0985858
--- /dev/null
+++ 
b/clients/src/test/java/org/apache/kafka/common/utils/ExponentialBackoffManagerTest.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.common.utils;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+class ExponentialBackoffManagerTest {
+
+    private static final ArrayList<Long> BACKOFF_LIST = new 
ArrayList<>(List.of(100L, 200L, 400L, 800L, 1600L));
+
+    @Test
+    public void testInitialState() {
+        ExponentialBackoffManager manager = new ExponentialBackoffManager(
+            5, 100, 2, 1000, 0.0);
+        assertEquals(0, manager.attempts());
+        assertTrue(manager.canAttempt());
+    }
+
+    @Test
+    public void testIncrementAttempt() {
+        ExponentialBackoffManager manager = new ExponentialBackoffManager(
+                5, 100, 2, 1000, 0.0);
+        assertEquals(0, manager.attempts());
+        manager.incrementAttempt();
+        assertEquals(1, manager.attempts());
+    }
+
+    @Test
+    public void testResetAttempts() {
+        ExponentialBackoffManager manager = new ExponentialBackoffManager(
+                5, 100, 2, 1000, 0.0);
+        manager.incrementAttempt();
+        manager.incrementAttempt();
+        manager.incrementAttempt();
+        assertEquals(3, manager.attempts());
+        
+        manager.resetAttempts();
+        assertEquals(0, manager.attempts());
+        assertTrue(manager.canAttempt());
+    }
+
+    @Test
+    public void testCanAttempt() {
+        ExponentialBackoffManager manager = new ExponentialBackoffManager(
+                3, 100, 2, 1000, 0.0);
+        // Initially can attempt
+        assertTrue(manager.canAttempt());
+        assertEquals(0, manager.attempts());
+
+        manager.incrementAttempt();
+        manager.incrementAttempt();
+        manager.incrementAttempt();
+        // After all retry attempts are exhausted
+        assertFalse(manager.canAttempt());
+        assertEquals(3, manager.attempts());
+    }
+
+    @Test
+    public void testBackOffWithoutJitter() {
+        ExponentialBackoffManager manager = new ExponentialBackoffManager(
+                5, 100, 2, 1000, 0.0);
+        for (int i = 0; i < 5; i++) {
+            long backoff = manager.backOff();
+            // without jitter, the backoff values should be exact multiples.
+            assertEquals(Math.min(1000L, BACKOFF_LIST.get(i)), backoff);
+            manager.incrementAttempt();
+        }
+    }
+
+    @Test
+    public void testBackOffWithJitter() {
+        ExponentialBackoffManager manager = new ExponentialBackoffManager(
+                5, 100, 2, 1000, 0.2);
+        for (int i = 0; i < 5; i++) {
+            long backoff = manager.backOff();
+            // with jitter, the backoff values should be within 20% of the 
expected value.
+            assertTrue(backoff >= 0.8 * Math.min(1000L, BACKOFF_LIST.get(i)));
+            assertTrue(backoff <= 1.2 * Math.min(1000L, BACKOFF_LIST.get(i)));
+            manager.incrementAttempt();
+        }
+    }
+}
\ No newline at end of file
diff --git a/core/src/main/scala/kafka/server/BrokerServer.scala 
b/core/src/main/scala/kafka/server/BrokerServer.scala
index 3cba40491cd..70e18a626a3 100644
--- a/core/src/main/scala/kafka/server/BrokerServer.scala
+++ b/core/src/main/scala/kafka/server/BrokerServer.scala
@@ -635,7 +635,8 @@ class BrokerServer(
         new LogContext(s"[NetworkPartitionMetadataClient 
broker=${config.brokerId}]")
       ),
       Time.SYSTEM,
-      config.interBrokerListenerName()
+      config.interBrokerListenerName(),
+      new SystemTimerReaper("network-partition-metadata-client-reaper", new 
SystemTimer("network-partition-metadata-client"))
     )
   }
 
diff --git 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/NetworkPartitionMetadataClient.java
 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/NetworkPartitionMetadataClient.java
index 355b0d2fb21..56297c408f0 100644
--- 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/NetworkPartitionMetadataClient.java
+++ 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/NetworkPartitionMetadataClient.java
@@ -30,10 +30,14 @@ import org.apache.kafka.common.network.ListenerName;
 import org.apache.kafka.common.protocol.Errors;
 import org.apache.kafka.common.requests.ListOffsetsRequest;
 import org.apache.kafka.common.requests.ListOffsetsResponse;
+import org.apache.kafka.common.utils.ExponentialBackoffManager;
 import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.metadata.MetadataCache;
 import org.apache.kafka.server.util.InterBrokerSendThread;
 import org.apache.kafka.server.util.RequestAndCompletionHandler;
+import org.apache.kafka.server.util.timer.Timer;
+import org.apache.kafka.server.util.timer.TimerTask;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -54,20 +58,42 @@ public class NetworkPartitionMetadataClient implements 
PartitionMetadataClient {
 
     private static final Logger log = 
LoggerFactory.getLogger(NetworkPartitionMetadataClient.class);
 
+    private static final long REQUEST_BACKOFF_MS = 1_000L;
+    private static final long REQUEST_BACKOFF_MAX_MS = 30_000L;
+    private static final int MAX_RETRY_ATTEMPTS = 5;
+
     private final MetadataCache metadataCache;
     private final Supplier<KafkaClient> networkClientSupplier;
     private final Time time;
     private final ListenerName listenerName;
     private final AtomicBoolean initialized = new AtomicBoolean(false);
     private volatile SendThread sendThread;
+    private final Timer timer;
 
     public NetworkPartitionMetadataClient(MetadataCache metadataCache,
                                           Supplier<KafkaClient> 
networkClientSupplier,
-                                          Time time, ListenerName 
listenerName) {
+                                          Time time, ListenerName 
listenerName, Timer timer) {
+        if (metadataCache == null) {
+            throw new IllegalArgumentException("MetadataCache must not be 
null.");
+        }
+        if (networkClientSupplier == null) {
+            throw new IllegalArgumentException("NetworkClientSupplier must not 
be null.");
+        }
+        if (time == null) {
+            throw new IllegalArgumentException("Time must not be null.");
+        }
+        if (listenerName == null) {
+            throw new IllegalArgumentException("ListenerName must not be 
null.");
+        }
+        if (timer == null) {
+            throw new IllegalArgumentException("Timer must not be null.");
+        }
+
         this.metadataCache = metadataCache;
         this.networkClientSupplier = networkClientSupplier;
         this.time = time;
         this.listenerName = listenerName;
+        this.timer = timer;
     }
 
     @Override
@@ -125,6 +151,7 @@ public class NetworkPartitionMetadataClient implements 
PartitionMetadataClient {
     public void close() {
         // Only close sendThread if it was initialized. Note, close is called 
only during broker shutdown, so need
         // for further synchronization here.
+        Utils.closeQuietly(timer, "NetworkPartitionMetadataClient timer");
         if (!initialized.get()) {
             return;
         }
@@ -186,14 +213,18 @@ public class NetworkPartitionMetadataClient implements 
PartitionMetadataClient {
      * Handles the response from a ListOffsets request.
      */
     // Visible for Testing.
-    void handleResponse(Map<TopicPartition, CompletableFuture<OffsetResponse>> 
partitionFutures, ClientResponse clientResponse) {
+    void handleResponse(PendingRequest pendingRequest, ClientResponse 
clientResponse) {
         // Handle error responses first
-        if (maybeHandleErrorResponse(partitionFutures, clientResponse)) {
+        if (maybeHandleErrorResponse(pendingRequest, clientResponse)) {
             return;
         }
 
         log.debug("ListOffsets response received successfully - {}", 
clientResponse);
+        // Reset retry attempts on success
+        pendingRequest.backoffManager().resetAttempts();
+        
         ListOffsetsResponse response = (ListOffsetsResponse) 
clientResponse.responseBody();
+        Map<TopicPartition, CompletableFuture<OffsetResponse>> 
partitionFutures = pendingRequest.futures();
 
         for (ListOffsetsTopicResponse topicResponse : response.topics()) {
             String topicName = topicResponse.name();
@@ -216,11 +247,14 @@ public class NetworkPartitionMetadataClient implements 
PartitionMetadataClient {
     }
 
     /**
-     * Handles error responses by completing all associated futures with an 
error. Returns true if an error was
-     * handled. Otherwise, returns false.
+     * Handles error responses by completing all associated futures with an 
error or retrying the request.
+     * Returns true if an error was handled. Otherwise, returns false.
      */
-    private boolean maybeHandleErrorResponse(Map<TopicPartition, 
CompletableFuture<OffsetResponse>> partitionFutures, ClientResponse 
clientResponse) {
+    private boolean maybeHandleErrorResponse(PendingRequest pendingRequest, 
ClientResponse clientResponse) {
+        Map<TopicPartition, CompletableFuture<OffsetResponse>> 
partitionFutures = pendingRequest.futures();
         Errors error;
+        boolean shouldRetry = false;
+        
         if (clientResponse == null) {
             log.error("Response for ListOffsets for topicPartitions: {} is 
null", partitionFutures.keySet());
             error = Errors.UNKNOWN_SERVER_ERROR;
@@ -231,11 +265,13 @@ public class NetworkPartitionMetadataClient implements 
PartitionMetadataClient {
             log.error("Version mismatch exception", 
clientResponse.versionMismatch());
             error = Errors.UNKNOWN_SERVER_ERROR;
         } else if (clientResponse.wasDisconnected()) {
-            log.error("Response for ListOffsets for TopicPartitions: {} was 
disconnected - {}.", partitionFutures.keySet(), clientResponse);
+            log.debug("Response for ListOffsets for TopicPartitions: {} was 
disconnected - {}.", partitionFutures.keySet(), clientResponse);
             error = Errors.NETWORK_EXCEPTION;
+            shouldRetry = true;
         } else if (clientResponse.wasTimedOut()) {
-            log.error("Response for ListOffsets for TopicPartitions: {} timed 
out - {}.", partitionFutures.keySet(), clientResponse);
+            log.debug("Response for ListOffsets for TopicPartitions: {} timed 
out - {}.", partitionFutures.keySet(), clientResponse);
             error = Errors.REQUEST_TIMED_OUT;
+            shouldRetry = true;
         } else if (!clientResponse.hasResponse()) {
             log.error("Response for ListOffsets for TopicPartitions: {} has no 
response - {}.", partitionFutures.keySet(), clientResponse);
             error = Errors.UNKNOWN_SERVER_ERROR;
@@ -244,6 +280,23 @@ public class NetworkPartitionMetadataClient implements 
PartitionMetadataClient {
             return false;
         }
 
+        // For retriable errors (disconnected or timed out), attempt retry if 
possible
+        if (shouldRetry) {
+            ExponentialBackoffManager backoffManager = 
pendingRequest.backoffManager();
+            if (backoffManager.canAttempt()) {
+                backoffManager.incrementAttempt();
+                long backoffMs = backoffManager.backOff();
+                log.debug("Retrying ListOffsets request for TopicPartitions: 
{} after {} ms (attempt {}/{})",
+                    partitionFutures.keySet(), backoffMs, 
backoffManager.attempts(), MAX_RETRY_ATTEMPTS);
+                timer.add(new RetryTimerTask(backoffMs, pendingRequest));
+                return true;
+            } else {
+                log.error("Exhausted max retries ({}) for ListOffsets request 
for TopicPartitions: {}",
+                    MAX_RETRY_ATTEMPTS, partitionFutures.keySet());
+            }
+        }
+
+        // Complete all futures with error (either non-retriable error or 
exhausted retries)
         partitionFutures.forEach((tp, future) -> future.complete(new 
OffsetResponse(-1, error)));
         return true;
     }
@@ -251,9 +304,39 @@ public class NetworkPartitionMetadataClient implements 
PartitionMetadataClient {
     /**
      * Tracks a pending ListOffsets request and its associated futures.
      */
-    private record PendingRequest(Node node,
-                                  Map<TopicPartition, 
CompletableFuture<OffsetResponse>> futures,
-                                  ListOffsetsRequest.Builder requestBuilder) {
+    // Visible for testing.
+    record PendingRequest(Node node,
+                          Map<TopicPartition, 
CompletableFuture<OffsetResponse>> futures,
+                          ListOffsetsRequest.Builder requestBuilder,
+                          ExponentialBackoffManager backoffManager) {
+        PendingRequest(Node node,
+                      Map<TopicPartition, CompletableFuture<OffsetResponse>> 
futures,
+                      ListOffsetsRequest.Builder requestBuilder) {
+            this(node, futures, requestBuilder, new ExponentialBackoffManager(
+                MAX_RETRY_ATTEMPTS,
+                REQUEST_BACKOFF_MS,
+                CommonClientConfigs.RETRY_BACKOFF_EXP_BASE,
+                REQUEST_BACKOFF_MAX_MS,
+                CommonClientConfigs.RETRY_BACKOFF_JITTER));
+        }
+    }
+
+    /**
+     * Timer task for retrying failed requests after backoff.
+     */
+    private final class RetryTimerTask extends TimerTask {
+        private final PendingRequest pendingRequest;
+
+        RetryTimerTask(long delayMs, PendingRequest pendingRequest) {
+            super(delayMs);
+            this.pendingRequest = pendingRequest;
+        }
+
+        @Override
+        public void run() {
+            sendThread.enqueue(pendingRequest);
+            sendThread.wakeup();
+        }
     }
 
     private class SendThread extends InterBrokerSendThread {
@@ -286,8 +369,7 @@ public class NetworkPartitionMetadataClient implements 
PartitionMetadataClient {
                     time.hiResClockMs(),
                     current.node,
                     requestBuilder,
-                    response -> handleResponse(current.futures, response)
-                );
+                    response -> handleResponse(current, response));
 
                 requests.add(requestHandler);
             }
diff --git 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/NetworkPartitionMetadataClientTest.java
 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/NetworkPartitionMetadataClientTest.java
index 49e4bf8c783..98528de556f 100644
--- 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/NetworkPartitionMetadataClientTest.java
+++ 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/NetworkPartitionMetadataClientTest.java
@@ -17,6 +17,7 @@
 package org.apache.kafka.coordinator.group;
 
 import org.apache.kafka.clients.ClientResponse;
+import org.apache.kafka.clients.CommonClientConfigs;
 import org.apache.kafka.clients.KafkaClient;
 import org.apache.kafka.clients.MockClient;
 import org.apache.kafka.common.Node;
@@ -31,8 +32,12 @@ import org.apache.kafka.common.protocol.Errors;
 import org.apache.kafka.common.requests.ListOffsetsRequest;
 import org.apache.kafka.common.requests.ListOffsetsResponse;
 import org.apache.kafka.common.security.auth.SecurityProtocol;
+import org.apache.kafka.common.utils.ExponentialBackoffManager;
+import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.metadata.MetadataCache;
 import org.apache.kafka.server.util.MockTime;
+import org.apache.kafka.server.util.timer.MockTimer;
+import org.apache.kafka.server.util.timer.Timer;
 
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
@@ -63,18 +68,27 @@ class NetworkPartitionMetadataClientTest {
     private static final MockTime MOCK_TIME = new MockTime();
     private static final MetadataCache METADATA_CACHE = 
mock(MetadataCache.class);
     private static final Supplier<KafkaClient> KAFKA_CLIENT_SUPPLIER = () -> 
mock(KafkaClient.class);
+    private static final Timer MOCK_TIMER = new MockTimer(MOCK_TIME);
     private static final String HOST = "localhost";
     private static final int PORT = 9092;
     private static final ListenerName LISTENER_NAME = 
ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT);
     private static final String TOPIC = "test-topic";
     private static final int PARTITION = 0;
     private static final Node LEADER_NODE = new Node(1, HOST, PORT);
+    private static final int MAX_RETRY_ATTEMPTS = 2;
+    private static final long REQUEST_BACKOFF_MS = 1_000L;
+    private static final long REQUEST_BACKOFF_MAX_MS = 30_000L;
+    private static final int RETRY_BACKOFF_EXP_BASE = 
CommonClientConfigs.RETRY_BACKOFF_EXP_BASE;
+    private static final double RETRY_BACKOFF_JITTER = 
CommonClientConfigs.RETRY_BACKOFF_JITTER;
+
 
     private NetworkPartitionMetadataClient networkPartitionMetadataClient;
 
     private static class NetworkPartitionMetadataClientBuilder {
         private MetadataCache metadataCache = METADATA_CACHE;
         private Supplier<KafkaClient> kafkaClientSupplier = 
KAFKA_CLIENT_SUPPLIER;
+        private Time time = MOCK_TIME;
+        private Timer timer = MOCK_TIMER;
 
         NetworkPartitionMetadataClientBuilder withMetadataCache(MetadataCache 
metadataCache) {
             this.metadataCache = metadataCache;
@@ -86,12 +100,22 @@ class NetworkPartitionMetadataClientTest {
             return this;
         }
 
+        NetworkPartitionMetadataClientBuilder withTime(Time time) {
+            this.time = time;
+            return this;
+        }
+
+        NetworkPartitionMetadataClientBuilder withTimer(Timer timer) {
+            this.timer = timer;
+            return this;
+        }
+
         static NetworkPartitionMetadataClientBuilder builder() {
             return new NetworkPartitionMetadataClientBuilder();
         }
 
         NetworkPartitionMetadataClient build() {
-            return new NetworkPartitionMetadataClient(metadataCache, 
kafkaClientSupplier, MOCK_TIME, LISTENER_NAME);
+            return new NetworkPartitionMetadataClient(metadataCache, 
kafkaClientSupplier, time, LISTENER_NAME, timer);
         }
     }
 
@@ -273,11 +297,16 @@ class NetworkPartitionMetadataClientTest {
         CompletableFuture<PartitionMetadataClient.OffsetResponse> 
partitionFuture = new CompletableFuture<>();
         Map<TopicPartition, 
CompletableFuture<PartitionMetadataClient.OffsetResponse>> futures = Map.of(
             tp,
-            partitionFuture
-        );
+            partitionFuture);
         networkPartitionMetadataClient = 
NetworkPartitionMetadataClientBuilder.builder().build();
+        Node node = mock(Node.class);
+        ListOffsetsRequest.Builder builder = 
mock(ListOffsetsRequest.Builder.class);
+        NetworkPartitionMetadataClient.PendingRequest pendingReqeust = new 
NetworkPartitionMetadataClient.PendingRequest(
+            node,
+            futures,
+            builder);
         // Pass null as clientResponse.
-        networkPartitionMetadataClient.handleResponse(futures, null);
+        networkPartitionMetadataClient.handleResponse(pendingReqeust, null);
         assertTrue(partitionFuture.isDone() && 
!partitionFuture.isCompletedExceptionally());
         PartitionMetadataClient.OffsetResponse response = 
partitionFuture.get();
         assertEquals(-1, response.offset());
@@ -290,14 +319,19 @@ class NetworkPartitionMetadataClientTest {
         CompletableFuture<PartitionMetadataClient.OffsetResponse> 
partitionFuture = new CompletableFuture<>();
         Map<TopicPartition, 
CompletableFuture<PartitionMetadataClient.OffsetResponse>> futures = Map.of(
             tp,
-            partitionFuture
-        );
+            partitionFuture);
         AuthenticationException authenticationException = new 
AuthenticationException("Test authentication exception");
         ClientResponse clientResponse = mock(ClientResponse.class);
         // Mock authentication exception in client response.
         
when(clientResponse.authenticationException()).thenReturn(authenticationException);
         networkPartitionMetadataClient = 
NetworkPartitionMetadataClientBuilder.builder().build();
-        networkPartitionMetadataClient.handleResponse(futures, clientResponse);
+        Node node = mock(Node.class);
+        ListOffsetsRequest.Builder builder = 
mock(ListOffsetsRequest.Builder.class);
+        NetworkPartitionMetadataClient.PendingRequest pendingReqeust = new 
NetworkPartitionMetadataClient.PendingRequest(
+            node,
+            futures,
+            builder);
+        networkPartitionMetadataClient.handleResponse(pendingReqeust, 
clientResponse);
         assertTrue(partitionFuture.isDone() && 
!partitionFuture.isCompletedExceptionally());
         PartitionMetadataClient.OffsetResponse response = 
partitionFuture.get();
         assertEquals(-1, response.offset());
@@ -310,64 +344,26 @@ class NetworkPartitionMetadataClientTest {
         CompletableFuture<PartitionMetadataClient.OffsetResponse> 
partitionFuture = new CompletableFuture<>();
         Map<TopicPartition, 
CompletableFuture<PartitionMetadataClient.OffsetResponse>> futures = Map.of(
             tp,
-            partitionFuture
-        );
+            partitionFuture);
         UnsupportedVersionException unsupportedVersionException = new 
UnsupportedVersionException("Test unsupportedVersionException exception");
         ClientResponse clientResponse = mock(ClientResponse.class);
         when(clientResponse.authenticationException()).thenReturn(null);
         // Mock version mismatch exception in client response.
         
when(clientResponse.versionMismatch()).thenReturn(unsupportedVersionException);
         networkPartitionMetadataClient = 
NetworkPartitionMetadataClientBuilder.builder().build();
-        networkPartitionMetadataClient.handleResponse(futures, clientResponse);
+        Node node = mock(Node.class);
+        ListOffsetsRequest.Builder builder = 
mock(ListOffsetsRequest.Builder.class);
+        NetworkPartitionMetadataClient.PendingRequest pendingReqeust = new 
NetworkPartitionMetadataClient.PendingRequest(
+            node,
+            futures,
+            builder);
+        networkPartitionMetadataClient.handleResponse(pendingReqeust, 
clientResponse);
         assertTrue(partitionFuture.isDone() && 
!partitionFuture.isCompletedExceptionally());
         PartitionMetadataClient.OffsetResponse response = 
partitionFuture.get();
         assertEquals(-1, response.offset());
         assertEquals(Errors.UNKNOWN_SERVER_ERROR.code(), 
response.error().code());
     }
 
-    @Test
-    public void testListLatestOffsetsDisconnected() throws ExecutionException, 
InterruptedException {
-        TopicPartition tp = new TopicPartition(TOPIC, PARTITION);
-        CompletableFuture<PartitionMetadataClient.OffsetResponse> 
partitionFuture = new CompletableFuture<>();
-        Map<TopicPartition, 
CompletableFuture<PartitionMetadataClient.OffsetResponse>> futures = Map.of(
-            tp,
-            partitionFuture
-        );
-        ClientResponse clientResponse = mock(ClientResponse.class);
-        when(clientResponse.authenticationException()).thenReturn(null);
-        when(clientResponse.versionMismatch()).thenReturn(null);
-        // Mock disconnected in client response.
-        when(clientResponse.wasDisconnected()).thenReturn(true);
-        networkPartitionMetadataClient = 
NetworkPartitionMetadataClientBuilder.builder().build();
-        networkPartitionMetadataClient.handleResponse(futures, clientResponse);
-        assertTrue(partitionFuture.isDone() && 
!partitionFuture.isCompletedExceptionally());
-        PartitionMetadataClient.OffsetResponse response = 
partitionFuture.get();
-        assertEquals(-1, response.offset());
-        assertEquals(Errors.NETWORK_EXCEPTION.code(), response.error().code());
-    }
-
-    @Test
-    public void testListLatestOffsetsTimedOut() throws ExecutionException, 
InterruptedException {
-        TopicPartition tp = new TopicPartition(TOPIC, PARTITION);
-        CompletableFuture<PartitionMetadataClient.OffsetResponse> 
partitionFuture = new CompletableFuture<>();
-        Map<TopicPartition, 
CompletableFuture<PartitionMetadataClient.OffsetResponse>> futures = Map.of(
-            tp,
-            partitionFuture
-        );
-        ClientResponse clientResponse = mock(ClientResponse.class);
-        when(clientResponse.authenticationException()).thenReturn(null);
-        when(clientResponse.versionMismatch()).thenReturn(null);
-        when(clientResponse.wasDisconnected()).thenReturn(false);
-        // Mock timed out in client response.
-        when(clientResponse.wasTimedOut()).thenReturn(true);
-        networkPartitionMetadataClient = 
NetworkPartitionMetadataClientBuilder.builder().build();
-        networkPartitionMetadataClient.handleResponse(futures, clientResponse);
-        assertTrue(partitionFuture.isDone() && 
!partitionFuture.isCompletedExceptionally());
-        PartitionMetadataClient.OffsetResponse response = 
partitionFuture.get();
-        assertEquals(-1, response.offset());
-        assertEquals(Errors.REQUEST_TIMED_OUT.code(), response.error().code());
-    }
-
     @Test
     public void testListLatestOffsetsMultiplePartitionsSameLeader() throws 
ExecutionException, InterruptedException {
         TopicPartition tp1 = new TopicPartition(TOPIC, PARTITION);
@@ -969,4 +965,206 @@ class NetworkPartitionMetadataClientTest {
         // Verify supplier was still only called once (not again)
         assertEquals(1, supplierCallCount[0]);
     }
+
+    @Test
+    public void testRetryOnDisconnect() {
+        TopicPartition tp = new TopicPartition(TOPIC, PARTITION);
+        CompletableFuture<PartitionMetadataClient.OffsetResponse> 
partitionFuture = new CompletableFuture<>();
+        Map<TopicPartition, 
CompletableFuture<PartitionMetadataClient.OffsetResponse>> futures = Map.of(
+            tp,
+            partitionFuture);
+        MockTimer timer = new MockTimer(MOCK_TIME);
+        ClientResponse clientResponse = mock(ClientResponse.class);
+        when(clientResponse.authenticationException()).thenReturn(null);
+        when(clientResponse.versionMismatch()).thenReturn(null);
+        when(clientResponse.wasDisconnected()).thenReturn(true);
+
+        networkPartitionMetadataClient = 
NetworkPartitionMetadataClientBuilder.builder()
+            .withTimer(timer)
+            .build();
+
+        ExponentialBackoffManager exponentialBackoffManager = new 
ExponentialBackoffManager(
+            MAX_RETRY_ATTEMPTS,
+            REQUEST_BACKOFF_MS,
+            RETRY_BACKOFF_EXP_BASE,
+            REQUEST_BACKOFF_MAX_MS,
+            RETRY_BACKOFF_JITTER);
+        Node node = mock(Node.class);
+        ListOffsetsRequest.Builder builder = 
mock(ListOffsetsRequest.Builder.class);
+        NetworkPartitionMetadataClient.PendingRequest pendingRequest = new 
NetworkPartitionMetadataClient.PendingRequest(
+            node,
+            futures,
+            builder,
+            exponentialBackoffManager);
+
+        // Initially, timer should be empty
+        assertEquals(0, timer.size());
+        assertEquals(0, exponentialBackoffManager.attempts());
+
+        // Handle disconnected response
+        networkPartitionMetadataClient.handleResponse(pendingRequest, 
clientResponse);
+
+        // Verify that a timer entry is present for retry
+        assertEquals(1, timer.size());
+        assertEquals(1, exponentialBackoffManager.attempts());
+        // Future should not be completed yet since retry is scheduled
+        assertFalse(partitionFuture.isDone());
+    }
+
+    @Test
+    public void testRetryOnTimeout() {
+        TopicPartition tp = new TopicPartition(TOPIC, PARTITION);
+        CompletableFuture<PartitionMetadataClient.OffsetResponse> 
partitionFuture = new CompletableFuture<>();
+        Map<TopicPartition, 
CompletableFuture<PartitionMetadataClient.OffsetResponse>> futures = Map.of(
+            tp,
+            partitionFuture);
+        MockTimer timer = new MockTimer(MOCK_TIME);
+        ClientResponse clientResponse = mock(ClientResponse.class);
+        when(clientResponse.authenticationException()).thenReturn(null);
+        when(clientResponse.versionMismatch()).thenReturn(null);
+        when(clientResponse.wasDisconnected()).thenReturn(false);
+        when(clientResponse.wasTimedOut()).thenReturn(true);
+
+        networkPartitionMetadataClient = 
NetworkPartitionMetadataClientBuilder.builder()
+            .withTimer(timer)
+            .build();
+
+        ExponentialBackoffManager exponentialBackoffManager = new 
ExponentialBackoffManager(
+            MAX_RETRY_ATTEMPTS,
+            REQUEST_BACKOFF_MS,
+            RETRY_BACKOFF_EXP_BASE,
+            REQUEST_BACKOFF_MAX_MS,
+            RETRY_BACKOFF_JITTER);
+        Node node = mock(Node.class);
+        ListOffsetsRequest.Builder builder = 
mock(ListOffsetsRequest.Builder.class);
+        NetworkPartitionMetadataClient.PendingRequest pendingRequest = new 
NetworkPartitionMetadataClient.PendingRequest(
+            node,
+            futures,
+            builder,
+            exponentialBackoffManager);
+
+        // Initially, timer should be empty
+        assertEquals(0, timer.size());
+        assertEquals(0, exponentialBackoffManager.attempts());
+
+        // Handle timeout response
+        networkPartitionMetadataClient.handleResponse(pendingRequest, 
clientResponse);
+
+        // Verify that a timer entry is present for retry
+        assertEquals(1, timer.size());
+        assertEquals(1, exponentialBackoffManager.attempts());
+        // Future should not be completed yet since retry is scheduled
+        assertFalse(partitionFuture.isDone());
+    }
+
+    @Test
+    public void testMaxRetryAttemptsExhaustedOnDisconnect() throws 
ExecutionException, InterruptedException {
+        TopicPartition tp = new TopicPartition(TOPIC, PARTITION);
+        CompletableFuture<PartitionMetadataClient.OffsetResponse> 
partitionFuture = new CompletableFuture<>();
+        Map<TopicPartition, 
CompletableFuture<PartitionMetadataClient.OffsetResponse>> futures = Map.of(
+            tp,
+            partitionFuture);
+        MockTimer timer = new MockTimer(MOCK_TIME);
+        ClientResponse clientResponse = mock(ClientResponse.class);
+        when(clientResponse.authenticationException()).thenReturn(null);
+        when(clientResponse.versionMismatch()).thenReturn(null);
+        when(clientResponse.wasDisconnected()).thenReturn(true);
+
+        networkPartitionMetadataClient = 
NetworkPartitionMetadataClientBuilder.builder()
+            .withTimer(timer)
+            .build();
+
+        ExponentialBackoffManager exponentialBackoffManager = new 
ExponentialBackoffManager(
+            MAX_RETRY_ATTEMPTS,
+            REQUEST_BACKOFF_MS,
+            RETRY_BACKOFF_EXP_BASE,
+            REQUEST_BACKOFF_MAX_MS,
+            RETRY_BACKOFF_JITTER);
+        Node node = mock(Node.class);
+        ListOffsetsRequest.Builder builder = 
mock(ListOffsetsRequest.Builder.class);
+
+        NetworkPartitionMetadataClient.PendingRequest pendingRequest = new 
NetworkPartitionMetadataClient.PendingRequest(
+            node,
+            futures,
+            builder,
+            exponentialBackoffManager);
+
+        // Initially, timer should be empty
+        assertEquals(0, timer.size());
+
+        // Exhaust all retry attempts by incrementing to MAX_RETRY_ATTEMPTS (5)
+        for (int i = 0; i < MAX_RETRY_ATTEMPTS; i++) {
+            exponentialBackoffManager.incrementAttempt();
+        }
+
+        // Verify that attempts are exhausted
+        assertFalse(exponentialBackoffManager.canAttempt());
+
+        // Handle disconnected response with exhausted retries
+        networkPartitionMetadataClient.handleResponse(pendingRequest, 
clientResponse);
+
+        // Verify that no timer entry is added (max retries exhausted)
+        assertEquals(0, timer.size());
+        // Verify that future is completed with error
+        assertTrue(partitionFuture.isDone());
+        PartitionMetadataClient.OffsetResponse response = 
partitionFuture.get();
+        assertEquals(-1, response.offset());
+        assertEquals(Errors.NETWORK_EXCEPTION.code(), response.error().code());
+    }
+
+    @Test
+    public void testMaxRetryAttemptsExhaustedOnTimeout() throws 
ExecutionException, InterruptedException {
+        TopicPartition tp = new TopicPartition(TOPIC, PARTITION);
+        CompletableFuture<PartitionMetadataClient.OffsetResponse> 
partitionFuture = new CompletableFuture<>();
+        Map<TopicPartition, 
CompletableFuture<PartitionMetadataClient.OffsetResponse>> futures = Map.of(
+            tp,
+            partitionFuture);
+        MockTimer timer = new MockTimer(MOCK_TIME);
+        ClientResponse clientResponse = mock(ClientResponse.class);
+        when(clientResponse.authenticationException()).thenReturn(null);
+        when(clientResponse.versionMismatch()).thenReturn(null);
+        when(clientResponse.wasDisconnected()).thenReturn(false);
+        when(clientResponse.wasTimedOut()).thenReturn(true);
+
+        networkPartitionMetadataClient = 
NetworkPartitionMetadataClientBuilder.builder()
+            .withTimer(timer)
+            .build();
+
+        ExponentialBackoffManager exponentialBackoffManager = new 
ExponentialBackoffManager(
+            MAX_RETRY_ATTEMPTS,
+            REQUEST_BACKOFF_MS,
+            RETRY_BACKOFF_EXP_BASE,
+            REQUEST_BACKOFF_MAX_MS,
+            RETRY_BACKOFF_JITTER);
+        Node node = mock(Node.class);
+        ListOffsetsRequest.Builder builder = 
mock(ListOffsetsRequest.Builder.class);
+
+        NetworkPartitionMetadataClient.PendingRequest pendingRequest = new 
NetworkPartitionMetadataClient.PendingRequest(
+            node,
+            futures,
+            builder,
+            exponentialBackoffManager);
+
+        // Initially, timer should be empty
+        assertEquals(0, timer.size());
+
+        // Exhaust all retry attempts by incrementing to MAX_RETRY_ATTEMPTS (5)
+        for (int i = 0; i < MAX_RETRY_ATTEMPTS; i++) {
+            exponentialBackoffManager.incrementAttempt();
+        }
+
+        // Verify that attempts are exhausted
+        assertFalse(exponentialBackoffManager.canAttempt(), "Retry attempts 
should be exhausted");
+
+        // Handle timeout response with exhausted retries
+        networkPartitionMetadataClient.handleResponse(pendingRequest, 
clientResponse);
+
+        // Verify that no timer entry is added (max retries exhausted)
+        assertEquals(0, timer.size(), "Timer should not have an entry when max 
retries are exhausted");
+        // Verify that future is completed with error
+        assertTrue(partitionFuture.isDone(), "Future should be completed when 
max retries are exhausted");
+        PartitionMetadataClient.OffsetResponse response = 
partitionFuture.get();
+        assertEquals(-1, response.offset());
+        assertEquals(Errors.REQUEST_TIMED_OUT.code(), response.error().code());
+    }
 }
diff --git 
a/server-common/src/main/java/org/apache/kafka/server/share/persister/PersisterStateManager.java
 
b/server-common/src/main/java/org/apache/kafka/server/share/persister/PersisterStateManager.java
index 43562ecc177..6c151686e9b 100644
--- 
a/server-common/src/main/java/org/apache/kafka/server/share/persister/PersisterStateManager.java
+++ 
b/server-common/src/main/java/org/apache/kafka/server/share/persister/PersisterStateManager.java
@@ -53,7 +53,7 @@ import 
org.apache.kafka.common.requests.ReadShareGroupStateSummaryRequest;
 import org.apache.kafka.common.requests.ReadShareGroupStateSummaryResponse;
 import org.apache.kafka.common.requests.WriteShareGroupStateRequest;
 import org.apache.kafka.common.requests.WriteShareGroupStateResponse;
-import org.apache.kafka.common.utils.ExponentialBackoff;
+import org.apache.kafka.common.utils.ExponentialBackoffManager;
 import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.server.share.SharePartitionKey;
@@ -94,6 +94,8 @@ public class PersisterStateManager {
     public static final long REQUEST_BACKOFF_MS = 1_000L;
     public static final long REQUEST_BACKOFF_MAX_MS = 30_000L;
     private static final int MAX_FIND_COORD_ATTEMPTS = 5;
+    private static final int RETRY_BACKOFF_EXP_BASE = 
CommonClientConfigs.RETRY_BACKOFF_EXP_BASE;
+    private static final double RETRY_BACKOFF_JITTER = 
CommonClientConfigs.RETRY_BACKOFF_JITTER;
     private final Time time;
     private final Timer timer;
     private final ShareCoordinatorMetadataCacheHelper cacheHelper;
@@ -116,38 +118,6 @@ public class PersisterStateManager {
     // when generateRequests is called.
     private Runnable generateCallback;
 
-    private static class BackoffManager {
-        private final int maxAttempts;
-        private int attempts;
-        private final ExponentialBackoff backoff;
-
-        BackoffManager(int maxAttempts, long initialBackoffMs, long 
maxBackoffMs) {
-            this.maxAttempts = maxAttempts;
-            this.backoff = new ExponentialBackoff(
-                initialBackoffMs,
-                CommonClientConfigs.RETRY_BACKOFF_EXP_BASE,
-                maxBackoffMs,
-                CommonClientConfigs.RETRY_BACKOFF_JITTER
-            );
-        }
-
-        void incrementAttempt() {
-            attempts++;
-        }
-
-        void resetAttempts() {
-            attempts = 0;
-        }
-
-        boolean canAttempt() {
-            return attempts < maxAttempts;
-        }
-
-        long backOff() {
-            return this.backoff.backoff(attempts);
-        }
-    }
-
     public enum RPCType {
         INITIALIZE,
         READ,
@@ -219,7 +189,7 @@ public class PersisterStateManager {
      */
     public abstract class PersisterStateManagerHandler implements 
RequestCompletionHandler {
         protected Node coordinatorNode;
-        private final BackoffManager findCoordBackoff;
+        private final ExponentialBackoffManager findCoordBackoff;
         protected final Logger log;
         private Consumer<ClientResponse> onCompleteCallback;
         protected final SharePartitionKey partitionKey;
@@ -232,7 +202,12 @@ public class PersisterStateManager {
             long backoffMaxMs,
             int maxRPCRetryAttempts
         ) {
-            this.findCoordBackoff = new BackoffManager(maxRPCRetryAttempts, 
backoffMs, backoffMaxMs);
+            this.findCoordBackoff = new ExponentialBackoffManager(
+                maxRPCRetryAttempts,
+                backoffMs,
+                RETRY_BACKOFF_EXP_BASE,
+                backoffMaxMs,
+                RETRY_BACKOFF_JITTER);
             this.onCompleteCallback = response -> {
             }; // noop
             partitionKey = SharePartitionKey.getInstance(groupId, topicId, 
partition);
@@ -522,7 +497,7 @@ public class PersisterStateManager {
         private final int stateEpoch;
         private final long startOffset;
         private final CompletableFuture<InitializeShareGroupStateResponse> 
result;
-        private final BackoffManager initializeStateBackoff;
+        private final ExponentialBackoffManager initializeStateBackoff;
 
         public InitializeStateHandler(
             String groupId,
@@ -539,7 +514,12 @@ public class PersisterStateManager {
             this.stateEpoch = stateEpoch;
             this.startOffset = startOffset;
             this.result = result;
-            this.initializeStateBackoff = new 
BackoffManager(maxRPCRetryAttempts, backoffMs, backoffMaxMs);
+            this.initializeStateBackoff = new ExponentialBackoffManager(
+                maxRPCRetryAttempts,
+                backoffMs,
+                RETRY_BACKOFF_EXP_BASE,
+                backoffMaxMs,
+                RETRY_BACKOFF_JITTER);
         }
 
         public InitializeStateHandler(
@@ -701,7 +681,7 @@ public class PersisterStateManager {
         private final int deliveryCompleteCount;
         private final List<PersisterStateBatch> batches;
         private final CompletableFuture<WriteShareGroupStateResponse> result;
-        private final BackoffManager writeStateBackoff;
+        private final ExponentialBackoffManager writeStateBackoff;
 
         public WriteStateHandler(
             String groupId,
@@ -724,7 +704,12 @@ public class PersisterStateManager {
             this.deliveryCompleteCount = deliveryCompleteCount;
             this.batches = batches;
             this.result = result;
-            this.writeStateBackoff = new BackoffManager(maxRPCRetryAttempts, 
backoffMs, backoffMaxMs);
+            this.writeStateBackoff = new ExponentialBackoffManager(
+                maxRPCRetryAttempts,
+                backoffMs,
+                RETRY_BACKOFF_EXP_BASE,
+                backoffMaxMs,
+                RETRY_BACKOFF_JITTER);
         }
 
         public WriteStateHandler(
@@ -887,7 +872,7 @@ public class PersisterStateManager {
     public class ReadStateHandler extends PersisterStateManagerHandler {
         private final int leaderEpoch;
         private final CompletableFuture<ReadShareGroupStateResponse> result;
-        private final BackoffManager readStateBackoff;
+        private final ExponentialBackoffManager readStateBackoff;
 
         public ReadStateHandler(
             String groupId,
@@ -903,7 +888,12 @@ public class PersisterStateManager {
             super(groupId, topicId, partition, backoffMs, backoffMaxMs, 
maxRPCRetryAttempts);
             this.leaderEpoch = leaderEpoch;
             this.result = result;
-            this.readStateBackoff = new BackoffManager(maxRPCRetryAttempts, 
backoffMs, backoffMaxMs);
+            this.readStateBackoff = new ExponentialBackoffManager(
+                maxRPCRetryAttempts,
+                backoffMs,
+                RETRY_BACKOFF_EXP_BASE,
+                backoffMaxMs,
+                RETRY_BACKOFF_JITTER);
         }
 
         public ReadStateHandler(
@@ -1057,7 +1047,7 @@ public class PersisterStateManager {
     public class ReadStateSummaryHandler extends PersisterStateManagerHandler {
         private final int leaderEpoch;
         private final CompletableFuture<ReadShareGroupStateSummaryResponse> 
result;
-        private final BackoffManager readStateSummaryBackoff;
+        private final ExponentialBackoffManager readStateSummaryBackoff;
 
         public ReadStateSummaryHandler(
             String groupId,
@@ -1073,7 +1063,12 @@ public class PersisterStateManager {
             super(groupId, topicId, partition, backoffMs, backoffMaxMs, 
maxRPCRetryAttempts);
             this.leaderEpoch = leaderEpoch;
             this.result = result;
-            this.readStateSummaryBackoff = new 
BackoffManager(maxRPCRetryAttempts, backoffMs, backoffMaxMs);
+            this.readStateSummaryBackoff = new ExponentialBackoffManager(
+                maxRPCRetryAttempts,
+                backoffMs,
+                RETRY_BACKOFF_EXP_BASE,
+                backoffMaxMs,
+                RETRY_BACKOFF_JITTER);
         }
 
         public ReadStateSummaryHandler(
@@ -1226,7 +1221,7 @@ public class PersisterStateManager {
 
     public class DeleteStateHandler extends PersisterStateManagerHandler {
         private final CompletableFuture<DeleteShareGroupStateResponse> result;
-        private final BackoffManager deleteStateBackoff;
+        private final ExponentialBackoffManager deleteStateBackoff;
 
         public DeleteStateHandler(
             String groupId,
@@ -1239,7 +1234,12 @@ public class PersisterStateManager {
         ) {
             super(groupId, topicId, partition, backoffMs, backoffMaxMs, 
maxRPCRetryAttempts);
             this.result = result;
-            this.deleteStateBackoff = new BackoffManager(maxRPCRetryAttempts, 
backoffMs, backoffMaxMs);
+            this.deleteStateBackoff = new ExponentialBackoffManager(
+                maxRPCRetryAttempts,
+                backoffMs,
+                RETRY_BACKOFF_EXP_BASE,
+                backoffMaxMs,
+                RETRY_BACKOFF_JITTER);
         }
 
         public DeleteStateHandler(

Reply via email to