This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new e0fb3015da0 [cleanup] Refactored Backoff to be more consistent and 
intuitive to use (#25278)
e0fb3015da0 is described below

commit e0fb3015da0eed80fa62b7466e6052f83bf8e5ae
Author: Matteo Merli <[email protected]>
AuthorDate: Mon Mar 9 03:18:10 2026 -0700

    [cleanup] Refactored Backoff to be more consistent and intuitive to use 
(#25278)
---
 .../pulsar/broker/resources/BaseResources.java     |  14 +-
 .../pulsar/broker/service/AbstractReplicator.java  |  11 +-
 .../service/PulsarMetadataEventSynchronizer.java   |   9 +-
 .../pulsar/broker/service/TopicListService.java    |  14 +-
 .../PersistentDispatcherMultipleConsumers.java     |  21 +-
 ...rsistentDispatcherMultipleConsumersClassic.java |  11 +-
 .../PersistentDispatcherSingleActiveConsumer.java  |  12 +-
 .../service/persistent/PersistentReplicator.java   |   9 +-
 .../pendingack/impl/PendingAckHandleImpl.java      |   8 +-
 .../common/naming/NamespaceBundleFactory.java      |   6 +-
 .../pulsar/client/impl/ConnectionHandlerTest.java  |  25 ++-
 .../apache/pulsar/client/impl/RetryUtilTest.java   |  23 +--
 .../client/impl/BinaryProtoLookupService.java      |  12 +-
 .../pulsar/client/impl/ConnectionHandler.java      |   4 +-
 .../apache/pulsar/client/impl/ConsumerImpl.java    |  44 ++--
 .../apache/pulsar/client/impl/ProducerImpl.java    |  13 +-
 .../pulsar/client/impl/PulsarClientImpl.java       |  13 +-
 .../client/impl/PulsarServiceNameResolver.java     |  14 +-
 .../pulsar/client/impl/TopicListWatcher.java       |  15 +-
 .../client/impl/TransactionMetaStoreHandler.java   |  43 ++--
 .../org/apache/pulsar/client/util/RetryUtil.java   |   6 +-
 .../pulsar/client/impl/ConsumerImplTest.java       |   4 +-
 .../org/apache/pulsar/common/util/Backoff.java     | 226 +++++++++++++++------
 .../apache/pulsar/common/util/BackoffBuilder.java  |  65 ------
 .../org/apache/pulsar/common/util/BackoffTest.java | 143 ++++++-------
 .../pulsar/metadata/api/MetadataCacheConfig.java   |  24 +--
 .../metadata/cache/impl/MetadataCacheImpl.java     |  14 +-
 .../coordination/impl/ResourceLockImpl.java        |   8 +-
 .../apache/pulsar/metadata/MetadataCacheTest.java  |  57 +++---
 29 files changed, 449 insertions(+), 419 deletions(-)

diff --git 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/BaseResources.java
 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/BaseResources.java
index f31e5a6b78a..636a8db4856 100644
--- 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/BaseResources.java
+++ 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/BaseResources.java
@@ -20,6 +20,7 @@ package org.apache.pulsar.broker.resources;
 
 import com.fasterxml.jackson.core.type.TypeReference;
 import com.google.common.base.Joiner;
+import java.time.Duration;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
@@ -33,6 +34,7 @@ import java.util.concurrent.atomic.AtomicInteger;
 import java.util.function.Function;
 import lombok.Getter;
 import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.common.util.Backoff;
 import org.apache.pulsar.metadata.api.MetadataCache;
 import org.apache.pulsar.metadata.api.MetadataCacheConfig;
 import org.apache.pulsar.metadata.api.MetadataStore;
@@ -60,8 +62,10 @@ public class BaseResources<T> {
     public BaseResources(MetadataStore store, Class<T> clazz, int 
operationTimeoutSec) {
         this.store = store;
         this.cache = store.getMetadataCache(clazz, 
MetadataCacheConfig.builder()
-                
.retryBackoff(MetadataCacheConfig.DEFAULT_RETRY_BACKOFF_BUILDER.setMandatoryStop(operationTimeoutSec,
-                        TimeUnit.SECONDS))
+                .retryBackoff(Backoff.builder()
+                        .initialDelay(Duration.ofMillis(5))
+                        .maxBackoff(Duration.ofSeconds(3))
+                        
.mandatoryStop(Duration.ofSeconds(operationTimeoutSec)))
                 .build());
         this.operationTimeoutSec = operationTimeoutSec;
     }
@@ -69,8 +73,10 @@ public class BaseResources<T> {
     public BaseResources(MetadataStore store, TypeReference<T> typeRef, int 
operationTimeoutSec) {
         this.store = store;
         this.cache = store.getMetadataCache(typeRef, 
MetadataCacheConfig.builder()
-                
.retryBackoff(MetadataCacheConfig.DEFAULT_RETRY_BACKOFF_BUILDER.setMandatoryStop(operationTimeoutSec,
-                        TimeUnit.SECONDS))
+                .retryBackoff(Backoff.builder()
+                        .initialDelay(Duration.ofMillis(5))
+                        .maxBackoff(Duration.ofSeconds(3))
+                        
.mandatoryStop(Duration.ofSeconds(operationTimeoutSec)))
                 .build());
         this.operationTimeoutSec = operationTimeoutSec;
     }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractReplicator.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractReplicator.java
index c7a36ad1b21..3477ab793f3 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractReplicator.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractReplicator.java
@@ -71,8 +71,7 @@ public abstract class AbstractReplicator implements 
Replicator {
     protected final int producerQueueSize;
     protected final ProducerBuilder<byte[]> producerBuilder;
 
-    protected final Backoff backOff = new Backoff(100, TimeUnit.MILLISECONDS, 
1, TimeUnit.MINUTES, 0,
-            TimeUnit.MILLISECONDS);
+    protected final Backoff backOff = Backoff.create();
 
     protected final String replicatorPrefix;
 
@@ -209,7 +208,7 @@ public abstract class AbstractReplicator implements 
Replicator {
         }).exceptionally(ex -> {
             Pair<Boolean, State> setDisconnectedRes = 
compareSetAndGetState(State.Starting, State.Disconnected);
             if (setDisconnectedRes.getLeft()) {
-                long waitTimeMs = backOff.next();
+                long waitTimeMs = backOff.next().toMillis();
                 log.warn("[{}] Failed to create remote producer ({}), retrying 
in {} s",
                         replicatorId, ex.getMessage(), waitTimeMs / 1000.0);
                 // BackOff before retrying
@@ -238,7 +237,7 @@ public abstract class AbstractReplicator implements 
Replicator {
      * If we start a producer immediately, we will get a conflict 
producer(same name producer) registered error.
      */
     protected void delayStartProducerAfterDisconnected() {
-        long waitTimeMs = backOff.next();
+        long waitTimeMs = backOff.next().toMillis();
         if (log.isDebugEnabled()) {
             log.debug(
                     "[{}] waiting for producer to close before attempting to 
reconnect, retrying in {} s",
@@ -364,7 +363,7 @@ public abstract class AbstractReplicator implements 
Replicator {
                      *   Nit: The better solution is creating a {@link 
CompletableFuture} to trace the in-progress
                      *     creation and call 
"inProgressCreationFuture.thenApply(closeProducer())".
                      */
-                    long waitTimeMs = backOff.next();
+                    long waitTimeMs = backOff.next().toMillis();
                     brokerService.executor().schedule(() -> 
closeProducerAsync(true),
                             waitTimeMs, TimeUnit.MILLISECONDS);
                 } else {
@@ -415,7 +414,7 @@ public abstract class AbstractReplicator implements 
Replicator {
         return future.thenRun(() -> {
             actionAfterClosed.run();
         }).exceptionally(ex -> {
-            long waitTimeMs = backOff.next();
+            long waitTimeMs = backOff.next().toMillis();
             log.warn(
                     "[{}] Exception: '{}' occurred while trying to close the 
producer. Replicator state: {}."
                             + " Retrying again in {} s.",
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarMetadataEventSynchronizer.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarMetadataEventSynchronizer.java
index a5fac333ae5..0eee0c25ff3 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarMetadataEventSynchronizer.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarMetadataEventSynchronizer.java
@@ -65,8 +65,7 @@ public class PulsarMetadataEventSynchronizer implements 
MetadataEventSynchronize
     private volatile State state;
     public static final String SUBSCRIPTION_NAME = "metadata-syncer";
     private static final int MAX_PRODUCER_PENDING_SIZE = 1000;
-    protected final Backoff backOff = new Backoff(100, TimeUnit.MILLISECONDS, 
1, TimeUnit.MINUTES, 0,
-            TimeUnit.MILLISECONDS);
+    protected final Backoff backOff = Backoff.create();
     private volatile CompletableFuture<Void> closeFuture;
 
     public enum State {
@@ -166,7 +165,7 @@ public class PulsarMetadataEventSynchronizer implements 
MetadataEventSynchronize
                     });
                 }
             }).exceptionally(ex -> {
-                long waitTimeMs = backOff.next();
+                long waitTimeMs = backOff.next().toMillis();
                 log.warn("[{}] Failed to create producer ({}), retrying in {} 
s", topicName, ex.getMessage(),
                         waitTimeMs / 1000.0);
                 // BackOff before retrying
@@ -238,7 +237,7 @@ public class PulsarMetadataEventSynchronizer implements 
MetadataEventSynchronize
                 });
             }
         }).exceptionally(ex -> {
-            long waitTimeMs = backOff.next();
+            long waitTimeMs = backOff.next().toMillis();
             log.warn("[{}] Failed to create consumer ({}), retrying in {} s", 
topicName, ex.getMessage(),
                     waitTimeMs / 1000.0);
             // BackOff before retrying
@@ -318,7 +317,7 @@ public class PulsarMetadataEventSynchronizer implements 
MetadataEventSynchronize
                 return;
             }
             // Retry.
-            long waitTimeMs = backOff.next();
+            long waitTimeMs = backOff.next().toMillis();
             log.warn("[{}] Exception: '{}' occurred while trying to close the 
{}. Retrying again in {} s.",
                     topicName, ex.getMessage(), 
asyncCloseable.getClass().getSimpleName(), waitTimeMs / 1000.0, ex);
             brokerService.executor().schedule(() -> 
closeResource(asyncCloseable, future), waitTimeMs,
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/TopicListService.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/TopicListService.java
index 262c734ea95..eadb1f3969e 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/TopicListService.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/TopicListService.java
@@ -19,6 +19,7 @@
 package org.apache.pulsar.broker.service;
 
 import com.google.common.annotations.VisibleForTesting;
+import java.time.Duration;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
@@ -320,10 +321,9 @@ public class TopicListService {
                 .concurrencyLevel(1)
                 .build();
         this.topicResources = pulsar.getPulsarResources().getTopicResources();
-        this.retryBackoff = new Backoff(
-                100, TimeUnit.MILLISECONDS,
-                25, TimeUnit.SECONDS,
-                0, TimeUnit.MILLISECONDS);
+        this.retryBackoff = Backoff.builder()
+                .maxBackoff(Duration.ofSeconds(25))
+                .build();
     }
 
     public void inactivate() {
@@ -471,7 +471,7 @@ public class TopicListService {
                 if (connection.isActive() && (unwrappedException instanceof 
AsyncSemaphore.PermitAcquireTimeoutException
                         || unwrappedException instanceof 
AsyncSemaphore.PermitAcquireQueueFullException)) {
                     // retry with backoff if permit acquisition fails due to 
timeout or queue full
-                    long retryAfterMillis = this.retryBackoff.next();
+                    long retryAfterMillis = 
this.retryBackoff.next().toMillis();
                     log.info("[{}] {} when initializing topic list watcher 
watcherId={} for namespace {}. "
                                     + "Retrying in {} " + "ms.", connection, 
unwrappedException.getMessage(), watcherId,
                             namespace, retryAfterMillis);
@@ -568,7 +568,7 @@ public class TopicListService {
                         && (unwrappedException instanceof 
AsyncSemaphore.PermitAcquireTimeoutException
                         || unwrappedException instanceof 
AsyncSemaphore.PermitAcquireQueueFullException)) {
                     // retry with backoff if permit acquisition fails due to 
timeout or queue full
-                    long retryAfterMillis = this.retryBackoff.next();
+                    long retryAfterMillis = 
this.retryBackoff.next().toMillis();
                     log.info("[{}] {} when updating topic list watcher 
watcherId={} for namespace {}. Retrying in {} "
                                     + "ms.", connection, 
unwrappedException.getMessage(), watcherId, namespace,
                             retryAfterMillis);
@@ -687,7 +687,7 @@ public class TopicListService {
                 // stop retrying and complete successfully
                 return CompletableFuture.completedFuture(null);
             }
-            long retryDelay = retryBackoff.next();
+            long retryDelay = retryBackoff.next().toMillis();
             retryCount.incrementAndGet();
             log.info("[{}] Cannot acquire direct memory tokens for sending {}. 
Retry {} in {} ms. {}", connection,
                     operationName, retryCount.get(), retryDelay, 
t.getMessage());
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
index 4f33e3e379b..a88fbf863eb 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
@@ -23,6 +23,7 @@ import static 
org.apache.pulsar.broker.service.persistent.PersistentTopic.MESSAG
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Range;
+import java.time.Duration;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.Comparator;
@@ -165,14 +166,14 @@ public class PersistentDispatcherMultipleConsumers 
extends AbstractPersistentDis
         this.initializeDispatchRateLimiterIfNeeded();
         this.assignor = new SharedConsumerAssignor(this::getNextConsumer, 
this::addEntryToReplay, subscription);
         ServiceConfiguration serviceConfiguration = 
topic.getBrokerService().pulsar().getConfiguration();
-        this.readFailureBackoff = new Backoff(
-                
serviceConfiguration.getDispatcherReadFailureBackoffInitialTimeInMs(),
-                TimeUnit.MILLISECONDS,
-                1, TimeUnit.MINUTES, 0, TimeUnit.MILLISECONDS);
-        retryBackoff = new Backoff(
-                
serviceConfiguration.getDispatcherRetryBackoffInitialTimeInMs(), 
TimeUnit.MILLISECONDS,
-                serviceConfiguration.getDispatcherRetryBackoffMaxTimeInMs(), 
TimeUnit.MILLISECONDS,
-                0, TimeUnit.MILLISECONDS);
+        this.readFailureBackoff = Backoff.builder()
+                .initialDelay(Duration.ofMillis(serviceConfiguration
+                        .getDispatcherReadFailureBackoffInitialTimeInMs()))
+                .build();
+        retryBackoff = Backoff.builder()
+                
.initialDelay(Duration.ofMillis(serviceConfiguration.getDispatcherRetryBackoffInitialTimeInMs()))
+                
.maxBackoff(Duration.ofMillis(serviceConfiguration.getDispatcherRetryBackoffMaxTimeInMs()))
+                .build();
     }
 
     @Override
@@ -515,7 +516,7 @@ public class PersistentDispatcherMultipleConsumers extends 
AbstractPersistentDis
     }
 
     protected synchronized void reScheduleReadWithBackoff() {
-        reScheduleReadInMs(retryBackoff.next());
+        reScheduleReadInMs(retryBackoff.next().toMillis());
     }
 
     protected void reScheduleReadInMs(long readAfterMs) {
@@ -1000,7 +1001,7 @@ public class PersistentDispatcherMultipleConsumers 
extends AbstractPersistentDis
     public synchronized void readEntriesFailed(ManagedLedgerException 
exception, Object ctx) {
 
         ReadType readType = (ReadType) ctx;
-        long waitTimeMillis = readFailureBackoff.next();
+        long waitTimeMillis = readFailureBackoff.next().toMillis();
 
         // Do not keep reading more entries if the cursor is already closed.
         if (exception instanceof 
ManagedLedgerException.CursorAlreadyClosedException) {
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumersClassic.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumersClassic.java
index 0746b7215b1..d828f338132 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumersClassic.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumersClassic.java
@@ -22,6 +22,7 @@ import static 
org.apache.pulsar.broker.service.persistent.PersistentTopic.MESSAG
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Range;
+import java.time.Duration;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.Comparator;
@@ -158,10 +159,10 @@ public class PersistentDispatcherMultipleConsumersClassic 
extends AbstractPersis
         this.readBatchSize = serviceConfig.getDispatcherMaxReadBatchSize();
         this.initializeDispatchRateLimiterIfNeeded();
         this.assignor = new SharedConsumerAssignor(this::getNextConsumer, 
this::addMessageToReplay, subscription);
-        this.readFailureBackoff = new Backoff(
-                
topic.getBrokerService().pulsar().getConfiguration().getDispatcherReadFailureBackoffInitialTimeInMs(),
-                TimeUnit.MILLISECONDS,
-                1, TimeUnit.MINUTES, 0, TimeUnit.MILLISECONDS);
+        this.readFailureBackoff = Backoff.builder()
+                
.initialDelay(Duration.ofMillis(topic.getBrokerService().pulsar().getConfiguration()
+                        .getDispatcherReadFailureBackoffInitialTimeInMs()))
+                .build();
     }
 
     @Override
@@ -858,7 +859,7 @@ public class PersistentDispatcherMultipleConsumersClassic 
extends AbstractPersis
     public synchronized void readEntriesFailed(ManagedLedgerException 
exception, Object ctx) {
 
         ReadType readType = (ReadType) ctx;
-        long waitTimeMillis = readFailureBackoff.next();
+        long waitTimeMillis = readFailureBackoff.next().toMillis();
 
         // Do not keep reading more entries if the cursor is already closed.
         if (exception instanceof 
ManagedLedgerException.CursorAlreadyClosedException) {
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java
index 9e6ba93b9dc..2420b2107ea 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java
@@ -22,6 +22,7 @@ import static 
org.apache.bookkeeper.mledger.util.ManagedLedgerUtils.readEntriesW
 import static 
org.apache.pulsar.broker.service.persistent.PersistentTopic.MESSAGE_RATE_BACKOFF_MS;
 import static 
org.apache.pulsar.common.protocol.Commands.DEFAULT_CONSUMER_EPOCH;
 import com.google.common.annotations.VisibleForTesting;
+import java.time.Duration;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Optional;
@@ -87,10 +88,11 @@ public class PersistentDispatcherSingleActiveConsumer 
extends AbstractDispatcher
         this.name = topic.getName() + " / " + (cursor.getName() != null ? 
Codec.decode(cursor.getName())
                 : ""/* NonDurableCursor doesn't have name */);
         this.readBatchSize = serviceConfig.getDispatcherMaxReadBatchSize();
-        this.readFailureBackoff = new 
Backoff(serviceConfig.getDispatcherReadFailureBackoffInitialTimeInMs(),
-            TimeUnit.MILLISECONDS, 
serviceConfig.getDispatcherReadFailureBackoffMaxTimeInMs(),
-            TimeUnit.MILLISECONDS, 
serviceConfig.getDispatcherReadFailureBackoffMandatoryStopTimeInMs(),
-            TimeUnit.MILLISECONDS);
+        this.readFailureBackoff = Backoff.builder()
+            
.initialDelay(Duration.ofMillis(serviceConfig.getDispatcherReadFailureBackoffInitialTimeInMs()))
+            
.maxBackoff(Duration.ofMillis(serviceConfig.getDispatcherReadFailureBackoffMaxTimeInMs()))
+            
.mandatoryStop(Duration.ofMillis(serviceConfig.getDispatcherReadFailureBackoffMandatoryStopTimeInMs()))
+            .build();
         this.redeliveryTracker = 
RedeliveryTrackerDisabled.REDELIVERY_TRACKER_DISABLED;
         this.initializeDispatchRateLimiterIfNeeded();
     }
@@ -475,7 +477,7 @@ public class PersistentDispatcherSingleActiveConsumer 
extends AbstractDispatcher
             return;
         }
 
-        long waitTimeMillis = readFailureBackoff.next();
+        long waitTimeMillis = readFailureBackoff.next().toMillis();
 
         if (exception instanceof NoMoreEntriesToReadException) {
             if (cursor.getNumberOfEntriesInBacklog(false) == 0) {
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java
index c1d73cd3891..d56f2147514 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java
@@ -27,6 +27,7 @@ import com.google.common.annotations.VisibleForTesting;
 import io.netty.buffer.ByteBuf;
 import io.netty.util.Recycler;
 import io.netty.util.Recycler.Handle;
+import java.time.Duration;
 import java.util.Collections;
 import java.util.LinkedList;
 import java.util.List;
@@ -97,8 +98,10 @@ public abstract class PersistentReplicator extends 
AbstractReplicator
 
     protected int messageTTLInSeconds = 0;
 
-    private final Backoff readFailureBackoff = new Backoff(1, TimeUnit.SECONDS,
-            1, TimeUnit.MINUTES, 0, TimeUnit.MILLISECONDS);
+    private final Backoff readFailureBackoff = Backoff.builder()
+            .initialDelay(Duration.ofSeconds(1))
+            .maxBackoff(Duration.ofMinutes(1))
+            .build();
 
     private final PersistentMessageExpiryMonitor expiryMonitor;
     // for connected subscriptions, message expiry will be checked if the 
backlog is greater than this threshold
@@ -508,7 +511,7 @@ public abstract class PersistentReplicator extends 
AbstractReplicator
         // Reduce read batch size to avoid flooding bookies with retries
         readBatchSize = 
topic.getBrokerService().pulsar().getConfiguration().getDispatcherMinReadBatchSize();
 
-        long waitTimeMillis = readFailureBackoff.next();
+        long waitTimeMillis = readFailureBackoff.next().toMillis();
 
         if (exception instanceof CursorAlreadyClosedException) {
             log.warn("[{}] Error reading entries because replicator is"
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleImpl.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleImpl.java
index 591842927f3..bc4c74ab3b1 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleImpl.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleImpl.java
@@ -25,6 +25,7 @@ import static 
org.apache.bookkeeper.mledger.util.PositionAckSetUtil.compareToWit
 import static 
org.apache.bookkeeper.mledger.util.PositionAckSetUtil.isAckSetOverlap;
 import com.google.common.annotations.VisibleForTesting;
 import io.netty.util.Timer;
+import java.time.Duration;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
@@ -145,8 +146,9 @@ public class PendingAckHandleImpl extends 
PendingAckHandleState implements Pendi
 
     private final long pendingAckInitFailureBackoffInitialTimeInMs = 100;
 
-    public final Backoff backoff = new 
Backoff(pendingAckInitFailureBackoffInitialTimeInMs, TimeUnit.MILLISECONDS,
-            1, TimeUnit.MINUTES, 0, TimeUnit.MILLISECONDS);
+    public final Backoff backoff = Backoff.builder()
+            
.initialDelay(Duration.ofMillis(pendingAckInitFailureBackoffInitialTimeInMs))
+            .build();
 
     private final Timer transactionOpTimer;
 
@@ -958,7 +960,7 @@ public class PendingAckHandleImpl extends 
PendingAckHandleState implements Pendi
     public void exceptionHandleFuture(Throwable t) {
         if (isRetryableException(t)) {
             this.state = State.None;
-            long retryTime = backoff.next();
+            long retryTime = backoff.next().toMillis();
             log.warn("[{}][{}] Failed to init transaction pending ack. It will 
be retried in {} Ms",
                     persistentSubscription.getTopic().getName(), subName, 
retryTime, t);
             transactionOpTimer.newTimeout((timeout) -> init(), retryTime, 
TimeUnit.MILLISECONDS);
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/common/naming/NamespaceBundleFactory.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/common/naming/NamespaceBundleFactory.java
index 69f5208ce67..bace2bf87a9 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/common/naming/NamespaceBundleFactory.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/common/naming/NamespaceBundleFactory.java
@@ -138,14 +138,16 @@ public class NamespaceBundleFactory {
             future.completeExceptionally(e);
         } else {
             LOG.warn("Error loading bundle for {}. Retrying exception", 
namespace, e);
-            long retryDelay = backoff.next();
+            long retryDelay = backoff.next().toMillis();
             pulsar.getExecutor().schedule(() ->
                     doLoadBundles(namespace, future, backoff, retryDeadline), 
retryDelay, TimeUnit.MILLISECONDS);
         }
     }
 
     private static Backoff createBackoff() {
-        return new Backoff(100, TimeUnit.MILLISECONDS, 5, TimeUnit.SECONDS, 0, 
TimeUnit.MILLISECONDS);
+        return Backoff.builder()
+                .maxBackoff(Duration.ofSeconds(5))
+                .build();
     }
 
     private NamespaceBundles readBundles(NamespaceName namespace, 
LocalPolicies localPolicies, long version)
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ConnectionHandlerTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ConnectionHandlerTest.java
index 96079ae6daa..77bfe6f8ded 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ConnectionHandlerTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ConnectionHandlerTest.java
@@ -25,14 +25,12 @@ import java.time.Duration;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.lang3.reflect.FieldUtils;
 import org.apache.pulsar.client.api.ProducerConsumerBase;
 import org.apache.pulsar.common.util.Backoff;
-import org.apache.pulsar.common.util.BackoffBuilder;
 import org.apache.pulsar.common.util.FutureUtil;
 import org.awaitility.Awaitility;
 import org.awaitility.core.ConditionTimeoutException;
@@ -45,9 +43,9 @@ import org.testng.annotations.Test;
 @Test(groups = "broker-impl")
 public class ConnectionHandlerTest extends ProducerConsumerBase {
 
-    private static final Backoff BACKOFF = new 
BackoffBuilder().setInitialTime(1, TimeUnit.MILLISECONDS)
-            .setMandatoryStop(1, TimeUnit.SECONDS)
-            .setMax(3, TimeUnit.SECONDS).create();
+    private static final Backoff BACKOFF = 
Backoff.builder().initialDelay(Duration.ofMillis(1))
+            .mandatoryStop(Duration.ofSeconds(1))
+            .maxBackoff(Duration.ofSeconds(3)).build();
     private ExecutorService executor;
 
     @BeforeClass(alwaysRun = true)
@@ -115,7 +113,7 @@ public class ConnectionHandlerTest extends 
ProducerConsumerBase {
 
         // 2. connectionFailed is called
         final ConnectionHandler handler2 = new ConnectionHandler(
-                new MockedHandlerState((PulsarClientImpl) pulsarClient, null), 
new MockedBackoff(),
+                new MockedHandlerState((PulsarClientImpl) pulsarClient, null), 
createMockedBackoff(),
                 cnx -> CompletableFuture.completedFuture(null));
         FieldUtils.writeField(handler2, "duringConnect", duringConnect, true);
         handler2.grabCnx();
@@ -125,7 +123,7 @@ public class ConnectionHandlerTest extends 
ProducerConsumerBase {
 
         // 3. connectionOpened completes exceptionally
         final ConnectionHandler handler3 = new ConnectionHandler(
-                new MockedHandlerState((PulsarClientImpl) pulsarClient, 
"my-topic"), new MockedBackoff(),
+                new MockedHandlerState((PulsarClientImpl) pulsarClient, 
"my-topic"), createMockedBackoff(),
                 cnx -> FutureUtil.failedFuture(new RuntimeException("fail")));
         FieldUtils.writeField(handler3, "duringConnect", duringConnect, true);
         handler3.grabCnx();
@@ -146,11 +144,12 @@ public class ConnectionHandlerTest extends 
ProducerConsumerBase {
         }
     }
 
-    private static class MockedBackoff extends Backoff {
-
-        // Set a large backoff so that reconnection won't happen in tests
-        public MockedBackoff() {
-            super(1, TimeUnit.HOURS, 2, TimeUnit.HOURS, 1, TimeUnit.HOURS);
-        }
+    // Set a large backoff so that reconnection won't happen in tests
+    private static Backoff createMockedBackoff() {
+        return Backoff.builder()
+                .initialDelay(Duration.ofHours(1))
+                .maxBackoff(Duration.ofHours(2))
+                .mandatoryStop(Duration.ofHours(1))
+                .build();
     }
 }
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/RetryUtilTest.java 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/RetryUtilTest.java
index 9efd14df7d6..555c0ca0c70 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/RetryUtilTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/RetryUtilTest.java
@@ -20,15 +20,14 @@ package org.apache.pulsar.client.impl;
 
 import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertTrue;
+import java.time.Duration;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 import lombok.Cleanup;
 import org.apache.pulsar.client.util.RetryUtil;
 import org.apache.pulsar.common.util.Backoff;
-import org.apache.pulsar.common.util.BackoffBuilder;
 import org.apache.pulsar.common.util.FutureUtil;
 import org.testng.annotations.Test;
 
@@ -42,11 +41,11 @@ public class RetryUtilTest {
         ScheduledExecutorService executor = 
Executors.newSingleThreadScheduledExecutor();
         CompletableFuture<Boolean> callback = new CompletableFuture<>();
         AtomicInteger atomicInteger = new AtomicInteger(0);
-        Backoff backoff = new BackoffBuilder()
-                .setInitialTime(100, TimeUnit.MILLISECONDS)
-                .setMax(2000, TimeUnit.MILLISECONDS)
-                .setMandatoryStop(5000, TimeUnit.MILLISECONDS)
-                .create();
+        Backoff backoff = Backoff.builder()
+                .initialDelay(Duration.ofMillis(100))
+                .maxBackoff(Duration.ofMillis(2000))
+                .mandatoryStop(Duration.ofMillis(5000))
+                .build();
         RetryUtil.retryAsynchronously(() -> {
             CompletableFuture<Boolean> future = new CompletableFuture<>();
             atomicInteger.incrementAndGet();
@@ -66,11 +65,11 @@ public class RetryUtilTest {
         @Cleanup("shutdownNow")
         ScheduledExecutorService executor = 
Executors.newSingleThreadScheduledExecutor();
         CompletableFuture<Boolean> callback = new CompletableFuture<>();
-        Backoff backoff = new BackoffBuilder()
-                .setInitialTime(500, TimeUnit.MILLISECONDS)
-                .setMax(2000, TimeUnit.MILLISECONDS)
-                .setMandatoryStop(5000, TimeUnit.MILLISECONDS)
-                .create();
+        Backoff backoff = Backoff.builder()
+                .initialDelay(Duration.ofMillis(500))
+                .maxBackoff(Duration.ofMillis(2000))
+                .mandatoryStop(Duration.ofMillis(5000))
+                .build();
         long start = System.currentTimeMillis();
         RetryUtil.retryAsynchronously(() ->
                 FutureUtil.failedFuture(new RuntimeException("fail")), 
backoff, executor, callback);
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BinaryProtoLookupService.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BinaryProtoLookupService.java
index 1f66e65179a..2f121a80e15 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BinaryProtoLookupService.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BinaryProtoLookupService.java
@@ -25,6 +25,7 @@ import io.netty.util.concurrent.DefaultThreadFactory;
 import io.opentelemetry.api.common.Attributes;
 import java.net.InetSocketAddress;
 import java.net.URI;
+import java.time.Duration;
 import java.util.Map;
 import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
@@ -47,7 +48,6 @@ import org.apache.pulsar.common.protocol.Commands;
 import org.apache.pulsar.common.protocol.schema.BytesSchemaVersion;
 import org.apache.pulsar.common.schema.SchemaInfo;
 import org.apache.pulsar.common.util.Backoff;
-import org.apache.pulsar.common.util.BackoffBuilder;
 import org.apache.pulsar.common.util.FutureUtil;
 import org.jspecify.annotations.Nullable;
 import org.slf4j.Logger;
@@ -358,11 +358,9 @@ public class BinaryProtoLookupService implements 
LookupService {
                                                                       
@Nullable Map<String, String> properties) {
         CompletableFuture<GetTopicsResult> topicsFuture = new 
CompletableFuture<>();
         AtomicLong opTimeoutMs = new 
AtomicLong(client.getConfiguration().getOperationTimeoutMs());
-        Backoff backoff = new BackoffBuilder()
-                .setInitialTime(100, TimeUnit.MILLISECONDS)
-                .setMandatoryStop(opTimeoutMs.get() * 2, TimeUnit.MILLISECONDS)
-                .setMax(1, TimeUnit.MINUTES)
-                .create();
+        Backoff backoff = Backoff.builder()
+                .mandatoryStop(Duration.ofMillis(opTimeoutMs.get() * 2))
+                .build();
         getTopicsUnderNamespace(namespace, backoff, opTimeoutMs, topicsFuture, 
mode,
                 topicsPattern, topicsHash, properties);
         return topicsFuture;
@@ -404,7 +402,7 @@ public class BinaryProtoLookupService implements 
LookupService {
                 client.getCnxPool().releaseConnection(clientCnx);
             });
         }, lookupPinnedExecutor).exceptionally((e) -> {
-            long nextDelay = Math.min(backoff.next(), remainingTime.get());
+            long nextDelay = Math.min(backoff.next().toMillis(), 
remainingTime.get());
             if (nextDelay <= 0) {
                 getTopicsResultFuture.completeExceptionally(
                     new PulsarClientException.TimeoutException(
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionHandler.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionHandler.java
index a4e262b35ae..01771fef9d6 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionHandler.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionHandler.java
@@ -179,7 +179,7 @@ public class ConnectionHandler {
                     state.topic, state.getHandlerName(), state.getState());
             return;
         }
-        long delayMs = backoff.next();
+        long delayMs = backoff.next().toMillis();
         log.warn("[{}] [{}] Could not get connection to broker: {} -- Will try 
again in {} s",
                 state.topic, state.getHandlerName(),
                 exception.getMessage(), delayMs / 1000.0);
@@ -208,7 +208,7 @@ public class ConnectionHandler {
                         state.topic, state.getHandlerName(), state.getState());
                 return;
             }
-            long delayMs = initialConnectionDelayMs.orElse(backoff.next());
+            long delayMs = initialConnectionDelayMs.orElseGet(() -> 
backoff.next().toMillis());
             log.info("[{}] [{}] Closed connection {} -- Will try again in {} 
s, hostUrl: {}",
                     state.topic, state.getHandlerName(), cnx.channel(), 
delayMs / 1000.0, hostUrl.orElse(null));
             state.client.timer().newTimeout(timeout -> {
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
index 7091b05151e..868c45b277e 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
@@ -39,6 +39,7 @@ import java.io.IOException;
 import java.net.URI;
 import java.net.URISyntaxException;
 import java.nio.ByteBuffer;
+import java.time.Duration;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.BitSet;
@@ -130,7 +131,6 @@ import org.apache.pulsar.common.protocol.Commands;
 import org.apache.pulsar.common.schema.SchemaInfo;
 import org.apache.pulsar.common.schema.SchemaType;
 import org.apache.pulsar.common.util.Backoff;
-import org.apache.pulsar.common.util.BackoffBuilder;
 import org.apache.pulsar.common.util.CompletableFutureCancellationHandler;
 import org.apache.pulsar.common.util.ExceptionHandler;
 import org.apache.pulsar.common.util.FutureUtil;
@@ -371,12 +371,11 @@ public class ConsumerImpl<T> extends ConsumerBase<T> 
implements ConnectionHandle
         }
 
         this.connectionHandler = new ConnectionHandler(this,
-                new BackoffBuilder()
-                        
.setInitialTime(client.getConfiguration().getInitialBackoffIntervalNanos(),
-                                TimeUnit.NANOSECONDS)
-                        
.setMax(client.getConfiguration().getMaxBackoffIntervalNanos(), 
TimeUnit.NANOSECONDS)
-                        .setMandatoryStop(0, TimeUnit.MILLISECONDS)
-                        .create(),
+                Backoff.builder()
+                        
.initialDelay(Duration.ofNanos(client.getConfiguration()
+                                .getInitialBackoffIntervalNanos()))
+                        
.maxBackoff(Duration.ofNanos(client.getConfiguration().getMaxBackoffIntervalNanos()))
+                        .build(),
                 this);
 
         this.topicName = TopicName.get(topic);
@@ -2459,15 +2458,12 @@ public class ConsumerImpl<T> extends ConsumerBase<T> 
implements ConnectionHandle
             return producerSupplier.get();
         } else {
             // calculate backoff time for given failure count
-            Backoff backoff = new BackoffBuilder()
-                    .setInitialTime(100, TimeUnit.MILLISECONDS)
-                    
.setMandatoryStop(client.getConfiguration().getOperationTimeoutMs() * 2,
-                            TimeUnit.MILLISECONDS)
-                    .setMax(1, TimeUnit.MINUTES)
-                    .create();
+            Backoff backoff = Backoff.builder()
+                    
.mandatoryStop(Duration.ofMillis(client.getConfiguration().getOperationTimeoutMs()
 * 2))
+                    .build();
             long backoffTimeMillis = 0;
             for (int i = 0; i < failureCount; i++) {
-                backoffTimeMillis = backoff.next();
+                backoffTimeMillis = backoff.next().toMillis();
             }
             CompletableFuture<Producer<byte[]>> newProducer = new 
CompletableFuture<>();
             ScheduledExecutorService executor =
@@ -2569,11 +2565,9 @@ public class ConsumerImpl<T> extends ConsumerBase<T> 
implements ConnectionHandle
     private CompletableFuture<Void> seekAsyncInternal(long requestId, ByteBuf 
seek, MessageId seekId,
                                                       Long seekTimestamp, 
String seekBy) {
         AtomicLong opTimeoutMs = new 
AtomicLong(client.getConfiguration().getOperationTimeoutMs());
-        Backoff backoff = new BackoffBuilder()
-                .setInitialTime(100, TimeUnit.MILLISECONDS)
-                .setMax(opTimeoutMs.get() * 2, TimeUnit.MILLISECONDS)
-                .setMandatoryStop(0, TimeUnit.MILLISECONDS)
-                .create();
+        Backoff backoff = Backoff.builder()
+                .maxBackoff(Duration.ofMillis(opTimeoutMs.get() * 2))
+                .build();
 
         if (!seekStatus.compareAndSet(SeekStatus.NOT_STARTED, 
SeekStatus.IN_PROGRESS)) {
             final String message = String.format(
@@ -2631,7 +2625,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T> 
implements ConnectionHandle
                 return null;
             });
         } else {
-            long nextDelay = Math.min(backoff.next(), remainingTime.get());
+            long nextDelay = Math.min(backoff.next().toMillis(), 
remainingTime.get());
             if (nextDelay <= 0) {
                 failSeek(
                         new PulsarClientException.TimeoutException(
@@ -2838,11 +2832,9 @@ public class ConsumerImpl<T> extends ConsumerBase<T> 
implements ConnectionHandle
                 }
 
         AtomicLong opTimeoutMs = new 
AtomicLong(client.getConfiguration().getOperationTimeoutMs());
-        Backoff backoff = new BackoffBuilder()
-                .setInitialTime(100, TimeUnit.MILLISECONDS)
-                .setMax(opTimeoutMs.get() * 2, TimeUnit.MILLISECONDS)
-                .setMandatoryStop(0, TimeUnit.MILLISECONDS)
-                .create();
+        Backoff backoff = Backoff.builder()
+                .maxBackoff(Duration.ofMillis(opTimeoutMs.get() * 2))
+                .build();
 
         CompletableFuture<GetLastMessageIdResponse> getLastMessageIdFuture = 
new CompletableFuture<>();
 
@@ -2903,7 +2895,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T> 
implements ConnectionHandle
                 future.completeExceptionally(failReason);
                 return;
             }
-            long nextDelay = Math.min(backoff.next(), remainingTime.get());
+            long nextDelay = Math.min(backoff.next().toMillis(), 
remainingTime.get());
             if (nextDelay <= 0) {
                 future.completeExceptionally(
                     new PulsarClientException.TimeoutException(
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
index 89fa8ebb998..3d601465aac 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
@@ -48,6 +48,7 @@ import io.opentelemetry.api.common.Attributes;
 import java.io.IOException;
 import java.net.URI;
 import java.nio.ByteBuffer;
+import java.time.Duration;
 import java.util.ArrayDeque;
 import java.util.ArrayList;
 import java.util.Collections;
@@ -102,7 +103,7 @@ import org.apache.pulsar.common.protocol.schema.SchemaHash;
 import org.apache.pulsar.common.protocol.schema.SchemaVersion;
 import org.apache.pulsar.common.schema.SchemaInfo;
 import org.apache.pulsar.common.schema.SchemaType;
-import org.apache.pulsar.common.util.BackoffBuilder;
+import org.apache.pulsar.common.util.Backoff;
 import org.apache.pulsar.common.util.DateFormatter;
 import org.apache.pulsar.common.util.FutureUtil;
 import org.apache.pulsar.common.util.RelativeTimeUtil;
@@ -326,11 +327,11 @@ public class ProducerImpl<T> extends ProducerBase<T> 
implements TimerTask, Conne
 
     ConnectionHandler initConnectionHandler() {
         return new ConnectionHandler(this,
-            new BackoffBuilder()
-                
.setInitialTime(client.getConfiguration().getInitialBackoffIntervalNanos(), 
TimeUnit.NANOSECONDS)
-                
.setMax(client.getConfiguration().getMaxBackoffIntervalNanos(), 
TimeUnit.NANOSECONDS)
-                .setMandatoryStop(Math.max(100, conf.getSendTimeoutMs() - 
100), TimeUnit.MILLISECONDS)
-                .create(),
+            Backoff.builder()
+                
.initialDelay(Duration.ofNanos(client.getConfiguration().getInitialBackoffIntervalNanos()))
+                
.maxBackoff(Duration.ofNanos(client.getConfiguration().getMaxBackoffIntervalNanos()))
+                .mandatoryStop(Duration.ofMillis(Math.max(100, 
conf.getSendTimeoutMs() - 100)))
+                .build(),
         this);
     }
 
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
index c681959126a..2082f8b1750 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
@@ -96,7 +96,6 @@ import org.apache.pulsar.common.topics.TopicList;
 import org.apache.pulsar.common.topics.TopicsPattern;
 import org.apache.pulsar.common.topics.TopicsPatternFactory;
 import org.apache.pulsar.common.util.Backoff;
-import org.apache.pulsar.common.util.BackoffBuilder;
 import org.apache.pulsar.common.util.FutureUtil;
 import org.apache.pulsar.common.util.netty.DnsResolverUtil;
 import org.jspecify.annotations.Nullable;
@@ -1249,11 +1248,11 @@ public class PulsarClientImpl implements PulsarClient {
         try {
             TopicName topicName = TopicName.get(topic);
             AtomicLong opTimeoutMs = new AtomicLong(conf.getLookupTimeoutMs());
-            Backoff backoff = new BackoffBuilder()
-                    .setInitialTime(conf.getInitialBackoffIntervalNanos(), 
TimeUnit.NANOSECONDS)
-                    .setMandatoryStop(opTimeoutMs.get() * 2, 
TimeUnit.MILLISECONDS)
-                    .setMax(conf.getMaxBackoffIntervalNanos(), 
TimeUnit.NANOSECONDS)
-                    .create();
+            Backoff backoff = Backoff.builder()
+                    
.initialDelay(Duration.ofNanos(conf.getInitialBackoffIntervalNanos()))
+                    .mandatoryStop(Duration.ofMillis(opTimeoutMs.get() * 2))
+                    
.maxBackoff(Duration.ofNanos(conf.getMaxBackoffIntervalNanos()))
+                    .build();
             getPartitionedTopicMetadata(topicName, backoff, opTimeoutMs, 
metadataFuture,
                     new AtomicInteger(0),
                     metadataAutoCreationEnabled, 
useFallbackForNonPIP344Brokers);
@@ -1275,7 +1274,7 @@ public class PulsarClientImpl implements PulsarClient {
                 metadataAutoCreationEnabled, useFallbackForNonPIP344Brokers);
         queryFuture.thenAccept(future::complete).exceptionally(e -> {
             remainingTime.addAndGet(-1 * 
TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startTime));
-            long nextDelay = Math.min(backoff.next(), remainingTime.get());
+            long nextDelay = Math.min(backoff.next().toMillis(), 
remainingTime.get());
             // skip retry scheduler when set lookup throttle in client or 
server side which will lead to
             // `TooManyRequestsException`
             boolean isLookupThrottling = 
!PulsarClientException.isRetriableError(e.getCause())
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarServiceNameResolver.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarServiceNameResolver.java
index 1b23b6ce2fa..a41d7634583 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarServiceNameResolver.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarServiceNameResolver.java
@@ -29,7 +29,6 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
 import java.util.stream.Collectors;
@@ -37,7 +36,6 @@ import lombok.extern.slf4j.Slf4j;
 import org.apache.pulsar.client.api.PulsarClientException.InvalidServiceURL;
 import org.apache.pulsar.common.net.ServiceURI;
 import org.apache.pulsar.common.util.Backoff;
-import org.apache.pulsar.common.util.BackoffBuilder;
 
 /**
  * The default implementation of {@link ServiceNameResolver}.
@@ -220,10 +218,10 @@ public class PulsarServiceNameResolver implements 
ServiceNameResolver {
      * @return a new {@link EndpointStatus} instance
      */
     private EndpointStatus createEndpointStatus(boolean isAvailable, 
InetSocketAddress inetSocketAddress) {
-        Backoff backoff = new BackoffBuilder()
-                .setInitialTime(serviceUrlQuarantineInitDurationMs, 
TimeUnit.MILLISECONDS)
-                .setMax(serviceUrlQuarantineMaxDurationMs, 
TimeUnit.MILLISECONDS)
-                .create();
+        Backoff backoff = Backoff.builder()
+                
.initialDelay(Duration.ofMillis(serviceUrlQuarantineInitDurationMs))
+                
.maxBackoff(Duration.ofMillis(serviceUrlQuarantineMaxDurationMs))
+                .build();
         EndpointStatus endpointStatus =
                 new EndpointStatus(inetSocketAddress, backoff, 
System.currentTimeMillis(), 0,
                         isAvailable);
@@ -261,13 +259,13 @@ public class PulsarServiceNameResolver implements 
ServiceNameResolver {
                             Duration.ofMillis(elapsedTimeMsSinceLast));
                     status.setAvailable(true);
                     
status.setLastUpdateTimeStampMs(System.currentTimeMillis());
-                    
status.setNextDelayMsToRecover(status.getQuarantineBackoff().next());
+                    
status.setNextDelayMsToRecover(status.getQuarantineBackoff().next().toMillis());
                 }
             } else {
                 // from available to unavailable
                 status.setAvailable(false);
                 status.setLastUpdateTimeStampMs(System.currentTimeMillis());
-                
status.setNextDelayMsToRecover(status.getQuarantineBackoff().next());
+                
status.setNextDelayMsToRecover(status.getQuarantineBackoff().next().toMillis());
             }
         } else if (!status.isAvailable()) {
             // from unavailable to available
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicListWatcher.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicListWatcher.java
index 0c854a068a2..5f56fee818f 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicListWatcher.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicListWatcher.java
@@ -20,8 +20,8 @@ package org.apache.pulsar.client.impl;
 
 import com.google.common.annotations.VisibleForTesting;
 import io.netty.channel.ChannelHandlerContext;
+import java.time.Duration;
 import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLongFieldUpdater;
 import java.util.concurrent.atomic.AtomicReference;
@@ -36,7 +36,7 @@ import 
org.apache.pulsar.common.api.proto.CommandWatchTopicUpdate;
 import org.apache.pulsar.common.naming.NamespaceName;
 import org.apache.pulsar.common.protocol.Commands;
 import org.apache.pulsar.common.topics.TopicsPattern;
-import org.apache.pulsar.common.util.BackoffBuilder;
+import org.apache.pulsar.common.util.Backoff;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -77,12 +77,11 @@ public class TopicListWatcher extends HandlerState 
implements ConnectionHandler.
         this.patternConsumerUpdateQueue = patternConsumerUpdateQueue;
         this.name = "Watcher(" + topicsPattern + ")";
         this.connectionHandler = new ConnectionHandler(this,
-                new BackoffBuilder()
-                        
.setInitialTime(client.getConfiguration().getInitialBackoffIntervalNanos(),
-                                TimeUnit.NANOSECONDS)
-                        
.setMax(client.getConfiguration().getMaxBackoffIntervalNanos(), 
TimeUnit.NANOSECONDS)
-                        .setMandatoryStop(0, TimeUnit.MILLISECONDS)
-                        .create(),
+                Backoff.builder()
+                        
.initialDelay(Duration.ofNanos(client.getConfiguration()
+                                .getInitialBackoffIntervalNanos()))
+                        
.maxBackoff(Duration.ofNanos(client.getConfiguration().getMaxBackoffIntervalNanos()))
+                        .build(),
                 this);
         this.topicsPattern = topicsPattern;
         this.watcherId = watcherId;
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TransactionMetaStoreHandler.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TransactionMetaStoreHandler.java
index 4ca742d98ea..c3dbb93c4d6 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TransactionMetaStoreHandler.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TransactionMetaStoreHandler.java
@@ -27,6 +27,7 @@ import io.netty.util.Timer;
 import io.netty.util.TimerTask;
 import java.io.Closeable;
 import java.io.IOException;
+import java.time.Duration;
 import java.util.List;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentLinkedQueue;
@@ -48,7 +49,6 @@ import org.apache.pulsar.common.api.proto.Subscription;
 import org.apache.pulsar.common.api.proto.TxnAction;
 import org.apache.pulsar.common.protocol.Commands;
 import org.apache.pulsar.common.util.Backoff;
-import org.apache.pulsar.common.util.BackoffBuilder;
 import org.apache.pulsar.common.util.collections.ConcurrentLongHashMap;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -105,11 +105,12 @@ public class TransactionMetaStoreHandler extends 
HandlerState
                 pulsarClient.getConfiguration().getOperationTimeoutMs(), 
TimeUnit.MILLISECONDS);
         this.connectionHandler = new ConnectionHandler(
             this,
-            new BackoffBuilder()
-                
.setInitialTime(pulsarClient.getConfiguration().getInitialBackoffIntervalNanos(),
 TimeUnit.NANOSECONDS)
-                
.setMax(pulsarClient.getConfiguration().getMaxBackoffIntervalNanos(), 
TimeUnit.NANOSECONDS)
-                .setMandatoryStop(100, TimeUnit.MILLISECONDS)
-                .create(),
+            Backoff.builder()
+                .initialDelay(Duration.ofNanos(pulsarClient.getConfiguration()
+                        .getInitialBackoffIntervalNanos()))
+                
.maxBackoff(Duration.ofNanos(pulsarClient.getConfiguration().getMaxBackoffIntervalNanos()))
+                .mandatoryStop(Duration.ofMillis(100))
+                .build(),
             this);
         this.connectFuture = connectFuture;
         this.internalPinnedExecutor = 
pulsarClient.getInternalExecutorService();
@@ -296,7 +297,7 @@ public class TransactionMetaStoreHandler extends 
HandlerState
                                     }
                                 });
                             }
-                            , op.backoff.next(), TimeUnit.MILLISECONDS);
+                            , op.backoff.next().toMillis(), 
TimeUnit.MILLISECONDS);
                     return;
                 }
                 LOG.error("Got {} for request {} error {}", 
BaseCommand.Type.NEW_TXN.name(),
@@ -381,7 +382,7 @@ public class TransactionMetaStoreHandler extends 
HandlerState
                                     }
                                 });
                             }
-                            , op.backoff.next(), TimeUnit.MILLISECONDS);
+                            , op.backoff.next().toMillis(), 
TimeUnit.MILLISECONDS);
                     return;
                 }
                 LOG.error("{} for request {}, transaction {}, error: {}",
@@ -478,7 +479,7 @@ public class TransactionMetaStoreHandler extends 
HandlerState
                                     }
                                 });
                             }
-                            , op.backoff.next(), TimeUnit.MILLISECONDS);
+                            , op.backoff.next().toMillis(), 
TimeUnit.MILLISECONDS);
                     return;
                 }
                 LOG.error("{} failed for request {} error {}.", 
BaseCommand.Type.ADD_SUBSCRIPTION_TO_TXN.name(),
@@ -560,7 +561,7 @@ public class TransactionMetaStoreHandler extends 
HandlerState
                                     }
                                 });
                             }
-                            , op.backoff.next(), TimeUnit.MILLISECONDS);
+                            , op.backoff.next().toMillis(), 
TimeUnit.MILLISECONDS);
                     return;
                 }
                 LOG.error("Got {} response for request {}, transaction {}, 
error: {}",
@@ -604,12 +605,11 @@ public class TransactionMetaStoreHandler extends 
HandlerState
             OpForTxnIdCallBack op = RECYCLER.get();
             op.callback = callback;
             op.cmd = cmd;
-            op.backoff = new BackoffBuilder()
-                    
.setInitialTime(client.getConfiguration().getInitialBackoffIntervalNanos(),
-                            TimeUnit.NANOSECONDS)
-                    
.setMax(client.getConfiguration().getMaxBackoffIntervalNanos() / 10, 
TimeUnit.NANOSECONDS)
-                    .setMandatoryStop(0, TimeUnit.MILLISECONDS)
-                    .create();
+            op.backoff = Backoff.builder()
+                    .initialDelay(Duration.ofNanos(client.getConfiguration()
+                            .getInitialBackoffIntervalNanos()))
+                    
.maxBackoff(Duration.ofNanos(client.getConfiguration().getMaxBackoffIntervalNanos()
 / 10))
+                    .build();
             op.description = description;
             op.clientCnx = clientCnx;
             return op;
@@ -646,12 +646,11 @@ public class TransactionMetaStoreHandler extends 
HandlerState
             OpForVoidCallBack op = RECYCLER.get();
             op.callback = callback;
             op.cmd = cmd;
-            op.backoff = new BackoffBuilder()
-                    
.setInitialTime(client.getConfiguration().getInitialBackoffIntervalNanos(),
-                            TimeUnit.NANOSECONDS)
-                    
.setMax(client.getConfiguration().getMaxBackoffIntervalNanos() / 10, 
TimeUnit.NANOSECONDS)
-                    .setMandatoryStop(0, TimeUnit.MILLISECONDS)
-                    .create();
+            op.backoff = Backoff.builder()
+                    .initialDelay(Duration.ofNanos(client.getConfiguration()
+                            .getInitialBackoffIntervalNanos()))
+                    
.maxBackoff(Duration.ofNanos(client.getConfiguration().getMaxBackoffIntervalNanos()
 / 10))
+                    .build();
             op.description = description;
             op.clientCnx = clientCnx;
             return op;
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/util/RetryUtil.java 
b/pulsar-client/src/main/java/org/apache/pulsar/client/util/RetryUtil.java
index 912cb7d7c58..0d262d5b84f 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/util/RetryUtil.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/util/RetryUtil.java
@@ -32,10 +32,10 @@ public class RetryUtil {
     public static <T> void retryAsynchronously(Supplier<CompletableFuture<T>> 
supplier, Backoff backoff,
                                                ScheduledExecutorService 
scheduledExecutorService,
                                                CompletableFuture<T> callback) {
-        if (backoff.getMax() <= 0) {
+        if (backoff.getMax().isZero() || backoff.getMax().isNegative()) {
             throw new IllegalArgumentException("Illegal max retry time");
         }
-        if (backoff.getInitial() <= 0) {
+        if (backoff.getInitial().isZero() || 
backoff.getInitial().isNegative()) {
             throw new IllegalArgumentException("Illegal initial time");
         }
         scheduledExecutorService.execute(() ->
@@ -47,7 +47,7 @@ public class RetryUtil {
                                              CompletableFuture<T> callback) {
         supplier.get().whenComplete((result, e) -> {
             if (e != null) {
-                long next = backoff.next();
+                long next = backoff.next().toMillis();
                 boolean isMandatoryStop = backoff.isMandatoryStopMade();
                 if (isMandatoryStop) {
                     callback.completeExceptionally(e);
diff --git 
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ConsumerImplTest.java
 
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ConsumerImplTest.java
index 86500b2466a..62d6c0b3f7b 100644
--- 
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ConsumerImplTest.java
+++ 
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ConsumerImplTest.java
@@ -108,9 +108,9 @@ public class ConsumerImplTest {
     public void testCorrectBackoffConfiguration() {
         final Backoff backoff = consumer.getConnectionHandler().backoff;
         ClientConfigurationData clientConfigurationData = new 
ClientConfigurationData();
-        Assert.assertEquals(backoff.getMax(),
+        Assert.assertEquals(backoff.getMax().toMillis(),
                 
TimeUnit.NANOSECONDS.toMillis(clientConfigurationData.getMaxBackoffIntervalNanos()));
-        Assert.assertEquals(backoff.next(),
+        Assert.assertEquals(backoff.next().toMillis(),
                 
TimeUnit.NANOSECONDS.toMillis(clientConfigurationData.getInitialBackoffIntervalNanos()));
     }
 
diff --git 
a/pulsar-common/src/main/java/org/apache/pulsar/common/util/Backoff.java 
b/pulsar-common/src/main/java/org/apache/pulsar/common/util/Backoff.java
index 842d3bbaa97..5957ce86796 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/util/Backoff.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/util/Backoff.java
@@ -19,108 +19,204 @@
 package org.apache.pulsar.common.util;
 
 import java.time.Clock;
+import java.time.Duration;
+import java.time.Instant;
 import java.util.Random;
-import java.util.concurrent.TimeUnit;
-import lombok.Data;
+import lombok.Getter;
 
-// All variables are in TimeUnit millis by default
-@Data
+/**
+ * Exponential backoff with mandatory stop.
+ *
+ * <p>Delays start at {@code initialDelay} and double on every call to {@link 
#next()}, up to
+ * {@code maxBackoff}. A random jitter of up to 10% is subtracted from each 
value to avoid
+ * thundering-herd retries.
+ *
+ * <p>If a {@code mandatoryStop} duration is configured, the backoff tracks 
wall-clock time from the
+ * first {@link #next()} call. Once the elapsed time plus the next delay would 
exceed the mandatory
+ * stop, the delay is truncated so that the total does not exceed it, and 
{@link #isMandatoryStopMade()}
+ * returns {@code true}. After the mandatory stop, backoff continues to grow 
normally.
+ *
+ * <p>Use {@link #reset()} to restart the sequence from the initial delay.
+ *
+ * <pre>{@code
+ * Backoff backoff = Backoff.builder()
+ *         .initialDelay(Duration.ofMillis(100))
+ *         .maxBackoff(Duration.ofMinutes(1))
+ *         .mandatoryStop(Duration.ofSeconds(30))
+ *         .build();
+ *
+ * Duration delay = backoff.next();
+ * }</pre>
+ */
 public class Backoff {
-    public static final long DEFAULT_INTERVAL_IN_NANOSECONDS = 
TimeUnit.MILLISECONDS.toNanos(100);
-    public static final long MAX_BACKOFF_INTERVAL_NANOSECONDS = 
TimeUnit.SECONDS.toNanos(30);
-    private final long initial;
-    private final long max;
-    private final Clock clock;
-    private long next;
-    private long mandatoryStop;
+    private static final Duration DEFAULT_INITIAL_DELAY = 
Duration.ofMillis(100);
+    private static final Duration DEFAULT_MAX_BACKOFF_INTERVAL = 
Duration.ofMinutes(1);
+    private static final Random random = new Random();
 
-    private long firstBackoffTimeInMillis;
-    private boolean mandatoryStopMade = false;
+    @Getter
+    private final Duration initial;
+    @Getter
+    private final Duration max;
+    @Getter
+    private final Duration mandatoryStop;
+    private final Clock clock;
 
-    private static final Random random = new Random();
+    private Duration next;
+    @Getter
+    private Instant firstBackoffTime;
+    @Getter
+    private boolean mandatoryStopMade;
 
-    Backoff(long initial, TimeUnit unitInitial, long max, TimeUnit unitMax, 
long mandatoryStop,
-            TimeUnit unitMandatoryStop, Clock clock) {
-        this.initial = unitInitial.toMillis(initial);
-        this.max = unitMax.toMillis(max);
-        if (initial == 0 && max == 0 && mandatoryStop == 0) {
+    private Backoff(Duration initial, Duration max, Duration mandatoryStop, 
Clock clock) {
+        this.initial = initial;
+        this.max = max;
+        this.mandatoryStop = mandatoryStop;
+        this.next = initial;
+        this.clock = clock;
+        this.firstBackoffTime = Instant.EPOCH;
+        if (initial.isZero() && max.isZero() && mandatoryStop.isZero()) {
             this.mandatoryStopMade = true;
         }
-        this.next = this.initial;
-        this.mandatoryStop = unitMandatoryStop.toMillis(mandatoryStop);
-        this.clock = clock;
-        this.firstBackoffTimeInMillis = 0;
     }
 
-    public Backoff(long initial, TimeUnit unitInitial, long max, TimeUnit 
unitMax, long mandatoryStop,
-                   TimeUnit unitMandatoryStop) {
-        this(initial, unitInitial, max, unitMax, mandatoryStop, 
unitMandatoryStop, Clock.systemDefaultZone());
+    /**
+     * Creates a {@link Backoff} with the default configuration (initial delay 
100 ms, max 1 min,
+     * no mandatory stop).
+     *
+     * @return a new Backoff with default settings
+     */
+    public static Backoff create() {
+        return new Builder().build();
     }
 
-    public long next() {
-        long current = this.next;
-        if (current < max) {
-            this.next = Math.min(this.next * 2, this.max);
+    /**
+     * Creates a new {@link Builder} with default settings.
+     *
+     * @return a new builder instance
+     */
+    public static Builder builder() {
+        return new Builder();
+    }
+
+    /**
+     * Returns the next backoff delay, advancing the internal state.
+     *
+     * <p>The returned duration is never less than the initial delay and never 
more than the max
+     * backoff. A random jitter of up to 10% is subtracted to spread out 
concurrent retries.
+     *
+     * @return the delay to wait before the next retry attempt
+     */
+    public Duration next() {
+        Duration current = this.next;
+        if (current.compareTo(max) < 0) {
+            Duration doubled = this.next.multipliedBy(2);
+            this.next = doubled.compareTo(this.max) < 0 ? doubled : this.max;
         }
 
         // Check for mandatory stop
         if (!mandatoryStopMade) {
-            long now = clock.millis();
-            long timeElapsedSinceFirstBackoff = 0;
-            if (initial == current) {
-                firstBackoffTimeInMillis = now;
+            Instant now = clock.instant();
+            Duration timeElapsedSinceFirstBackoff = Duration.ZERO;
+            if (initial.equals(current)) {
+                firstBackoffTime = now;
             } else {
-                timeElapsedSinceFirstBackoff = now - firstBackoffTimeInMillis;
+                timeElapsedSinceFirstBackoff = 
Duration.between(firstBackoffTime, now);
             }
 
-            if (timeElapsedSinceFirstBackoff + current > mandatoryStop) {
-                current = Math.max(initial, mandatoryStop - 
timeElapsedSinceFirstBackoff);
+            if 
(timeElapsedSinceFirstBackoff.plus(current).compareTo(mandatoryStop) > 0) {
+                Duration remaining = 
mandatoryStop.minus(timeElapsedSinceFirstBackoff);
+                current = remaining.compareTo(initial) > 0 ? remaining : 
initial;
                 mandatoryStopMade = true;
             }
         }
 
         // Randomly decrease the timeout up to 10% to avoid simultaneous 
retries
-        // If current < 10 then current/10 < 1 and we get an exception from 
Random saying "Bound must be positive"
-        if (current > 10) {
-            current -= random.nextInt((int) current / 10);
+        long currentMillis = current.toMillis();
+        if (currentMillis > 10) {
+            currentMillis -= random.nextInt((int) currentMillis / 10);
         }
-        return Math.max(initial, current);
+        long initialMillis = initial.toMillis();
+        return Duration.ofMillis(Math.max(initialMillis, currentMillis));
     }
 
+    /**
+     * Halves the next delay (but never below the initial delay).
+     * Useful after a partially successful operation to converge faster.
+     */
     public void reduceToHalf() {
-        if (next > initial) {
-            this.next = Math.max(this.next / 2, this.initial);
+        if (next.compareTo(initial) > 0) {
+            Duration half = next.dividedBy(2);
+            this.next = half.compareTo(initial) > 0 ? half : initial;
         }
     }
 
+    /**
+     * Resets the backoff to its initial state so the next call to {@link 
#next()} returns the
+     * initial delay again. Also resets the mandatory-stop tracking.
+     */
     public void reset() {
         this.next = this.initial;
-        if (initial == 0 && max == 0 && mandatoryStop == 0) {
-            this.mandatoryStopMade = true;
-        } else {
-            this.mandatoryStopMade = false;
-        }
+        this.mandatoryStopMade = initial.isZero() && max.isZero() && 
mandatoryStop.isZero();
     }
 
-    public static boolean shouldBackoff(long initialTimestamp, TimeUnit 
unitInitial, int failedAttempts,
-                                        long defaultInterval, long 
maxBackoffInterval) {
-        long initialTimestampInNano = unitInitial.toNanos(initialTimestamp);
-        long currentTime = System.nanoTime();
-        long interval = defaultInterval;
-        for (int i = 1; i < failedAttempts; i++) {
-            interval = interval * 2;
-            if (interval > maxBackoffInterval) {
-                interval = maxBackoffInterval;
-                break;
-            }
+    /**
+     * Builder for {@link Backoff}.
+     *
+     * <p>Defaults: initial delay 100 ms, max backoff 1 min, no mandatory stop.
+     */
+    public static class Builder {
+        private Duration initialDelay = DEFAULT_INITIAL_DELAY;
+        private Duration maxBackoff = DEFAULT_MAX_BACKOFF_INTERVAL;
+        private Duration mandatoryStop = Duration.ZERO;
+        private Clock clock = Clock.systemDefaultZone();
+
+        /**
+         * Sets the initial (smallest) backoff delay. Defaults to 100 ms.
+         *
+         * @param initialDelay the initial delay
+         * @return this builder
+         */
+        public Builder initialDelay(Duration initialDelay) {
+            this.initialDelay = initialDelay;
+            return this;
         }
 
-        // if the current time is less than the time at which next retry 
should occur, we should backoff
-        return currentTime < (initialTimestampInNano + interval);
-    }
+        /**
+         * Sets the upper bound for the backoff delay. Defaults to 1 min.
+         *
+         * @param maxBackoff the maximum delay
+         * @return this builder
+         */
+        public Builder maxBackoff(Duration maxBackoff) {
+            this.maxBackoff = maxBackoff;
+            return this;
+        }
+
+        /**
+         * Sets the mandatory-stop deadline measured from the first {@link 
Backoff#next()} call.
+         * Once wall-clock time exceeds this duration the current delay is 
truncated and
+         * {@link Backoff#isMandatoryStopMade()} returns {@code true}. 
Defaults to zero (disabled).
+         *
+         * @param mandatoryStop the mandatory stop duration
+         * @return this builder
+         */
+        public Builder mandatoryStop(Duration mandatoryStop) {
+            this.mandatoryStop = mandatoryStop;
+            return this;
+        }
 
-    public static boolean shouldBackoff(long initialTimestamp, TimeUnit 
unitInitial, int failedAttempts) {
-        return Backoff.shouldBackoff(initialTimestamp, unitInitial, 
failedAttempts,
-                                     DEFAULT_INTERVAL_IN_NANOSECONDS, 
MAX_BACKOFF_INTERVAL_NANOSECONDS);
+        Builder clock(Clock clock) {
+            this.clock = clock;
+            return this;
+        }
+
+        /**
+         * Builds a new {@link Backoff} instance with the configured 
parameters.
+         *
+         * @return a new Backoff
+         */
+        public Backoff build() {
+            return new Backoff(initialDelay, maxBackoff, mandatoryStop, clock);
+        }
     }
 }
diff --git 
a/pulsar-common/src/main/java/org/apache/pulsar/common/util/BackoffBuilder.java 
b/pulsar-common/src/main/java/org/apache/pulsar/common/util/BackoffBuilder.java
deleted file mode 100644
index 69b39030081..00000000000
--- 
a/pulsar-common/src/main/java/org/apache/pulsar/common/util/BackoffBuilder.java
+++ /dev/null
@@ -1,65 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pulsar.common.util;
-
-import java.time.Clock;
-import java.util.concurrent.TimeUnit;
-
-public class BackoffBuilder {
-    private long initial;
-    private TimeUnit unitInitial;
-    private long max;
-    private TimeUnit unitMax;
-    private Clock clock;
-    private long mandatoryStop;
-    private TimeUnit unitMandatoryStop;
-
-    public BackoffBuilder() {
-        this.initial = 0;
-        this.unitInitial = TimeUnit.MILLISECONDS;
-        this.max = 0;
-        this.unitMax = TimeUnit.MILLISECONDS;
-        this.mandatoryStop = 0;
-        this.unitMandatoryStop = TimeUnit.MILLISECONDS;
-        this.clock = Clock.systemDefaultZone();
-    }
-
-    public BackoffBuilder setInitialTime(long initial, TimeUnit unitInitial) {
-        this.unitInitial = unitInitial;
-        this.initial = initial;
-        return this;
-    }
-
-    public BackoffBuilder setMax(long max, TimeUnit unitMax) {
-        this.unitMax = unitMax;
-        this.max = max;
-        return this;
-    }
-
-    public BackoffBuilder setMandatoryStop(long mandatoryStop, TimeUnit 
unitMandatoryStop) {
-        this.mandatoryStop = mandatoryStop;
-        this.unitMandatoryStop = unitMandatoryStop;
-        return this;
-    }
-
-
-    public Backoff create() {
-        return new Backoff(initial, unitInitial, max, unitMax, mandatoryStop, 
unitMandatoryStop, clock);
-    }
-}
diff --git 
a/pulsar-common/src/test/java/org/apache/pulsar/common/util/BackoffTest.java 
b/pulsar-common/src/test/java/org/apache/pulsar/common/util/BackoffTest.java
index 9ca9503b5c0..152ddee5156 100644
--- a/pulsar-common/src/test/java/org/apache/pulsar/common/util/BackoffTest.java
+++ b/pulsar-common/src/test/java/org/apache/pulsar/common/util/BackoffTest.java
@@ -22,35 +22,31 @@ import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertFalse;
 import static org.testng.Assert.assertTrue;
 import java.time.Clock;
+import java.time.Duration;
 import java.time.Instant;
 import java.time.ZoneId;
-import java.util.concurrent.TimeUnit;
 import org.mockito.Mockito;
 import org.testng.annotations.Test;
 
 public class BackoffTest {
     boolean withinTenPercentAndDecrementTimer(Backoff backoff, long t2) {
-        long t1 = backoff.next();
+        long t1 = backoff.next().toMillis();
         return (t1 >= t2 * 0.9 && t1 <= t2);
     }
 
     boolean checkExactAndDecrementTimer(Backoff backoff, long t2) {
-        long t1 = backoff.next();
+        long t1 = backoff.next().toMillis();
         return t1 == t2;
     }
-    @Test
-    public void shouldBackoffTest() {
-        // gives false
-        assertFalse(Backoff.shouldBackoff(0L, TimeUnit.NANOSECONDS, 0));
-        long currentTimestamp = System.nanoTime();
-        // gives true
-        assertTrue(Backoff.shouldBackoff(currentTimestamp, 
TimeUnit.NANOSECONDS, 100));
-    }
 
     @Test
     public void mandatoryStopTestNegativeTest() {
-        Backoff backoff = new Backoff(100, TimeUnit.MILLISECONDS, 60, 
TimeUnit.SECONDS, 1900, TimeUnit.MILLISECONDS);
-        assertEquals(backoff.next(), 100);
+        Backoff backoff = Backoff.builder()
+                .initialDelay(Duration.ofMillis(100))
+                .maxBackoff(Duration.ofSeconds(60))
+                .mandatoryStop(Duration.ofMillis(1900))
+                .build();
+        assertEquals(backoff.next().toMillis(), 100);
         backoff.next(); // 200
         backoff.next(); // 400
         backoff.next(); // 800
@@ -60,30 +56,35 @@ public class BackoffTest {
     @Test
     public void firstBackoffTimerTest() {
         Clock mockClock = Mockito.mock(Clock.class);
-        Mockito.when(mockClock.millis())
-            .thenReturn(0L)
-            .thenReturn(300L);
+        Mockito.when(mockClock.instant())
+            .thenReturn(Instant.ofEpochMilli(0))
+            .thenReturn(Instant.ofEpochMilli(300));
 
-        Backoff backoff = new Backoff(
-            100, TimeUnit.MILLISECONDS,
-            60, TimeUnit.SECONDS,
-            1900, TimeUnit.MILLISECONDS,
-            mockClock
-        );
+        Backoff backoff = Backoff.builder()
+            .initialDelay(Duration.ofMillis(100))
+            .maxBackoff(Duration.ofSeconds(60))
+            .mandatoryStop(Duration.ofMillis(1900))
+            .clock(mockClock)
+            .build();
 
-        assertEquals(backoff.next(), 100);
+        assertEquals(backoff.next().toMillis(), 100);
 
-        long firstBackOffTime = backoff.getFirstBackoffTimeInMillis();
+        Instant firstBackOffTime = backoff.getFirstBackoffTime();
         backoff.reset();
-        assertEquals(backoff.next(), 100);
-        long diffBackOffTime = backoff.getFirstBackoffTimeInMillis() - 
firstBackOffTime;
+        assertEquals(backoff.next().toMillis(), 100);
+        long diffBackOffTime = Duration.between(firstBackOffTime, 
backoff.getFirstBackoffTime()).toMillis();
         assertEquals(diffBackOffTime, 300);
     }
 
     @Test
     public void basicTest() {
         Clock mockClock = Clock.fixed(Instant.EPOCH, ZoneId.systemDefault());
-        Backoff backoff = new Backoff(5, TimeUnit.MILLISECONDS, 60, 
TimeUnit.SECONDS, 60, TimeUnit.SECONDS, mockClock);
+        Backoff backoff = Backoff.builder()
+                .initialDelay(Duration.ofMillis(5))
+                .maxBackoff(Duration.ofSeconds(60))
+                .mandatoryStop(Duration.ofSeconds(60))
+                .clock(mockClock)
+                .build();
         assertTrue(checkExactAndDecrementTimer(backoff, 5));
         assertTrue(withinTenPercentAndDecrementTimer(backoff, 10));
         backoff.reset();
@@ -93,18 +94,18 @@ public class BackoffTest {
     @Test
     public void maxTest() {
         Clock mockClock = Mockito.mock(Clock.class);
-        Mockito.when(mockClock.millis())
-            .thenReturn(0L)
-            .thenReturn(10L)
-            .thenReturn(20L)
-            .thenReturn(40L);
-
-        Backoff backoff = new Backoff(
-            5, TimeUnit.MILLISECONDS,
-            20, TimeUnit.MILLISECONDS,
-            20, TimeUnit.MILLISECONDS,
-            mockClock
-        );
+        Mockito.when(mockClock.instant())
+            .thenReturn(Instant.ofEpochMilli(0))
+            .thenReturn(Instant.ofEpochMilli(10))
+            .thenReturn(Instant.ofEpochMilli(20))
+            .thenReturn(Instant.ofEpochMilli(40));
+
+        Backoff backoff = Backoff.builder()
+            .initialDelay(Duration.ofMillis(5))
+            .maxBackoff(Duration.ofMillis(20))
+            .mandatoryStop(Duration.ofMillis(20))
+            .clock(mockClock)
+            .build();
 
         assertTrue(checkExactAndDecrementTimer(backoff, 5));
         assertTrue(withinTenPercentAndDecrementTimer(backoff, 10));
@@ -116,73 +117,73 @@ public class BackoffTest {
     public void mandatoryStopTest() {
         Clock mockClock = Mockito.mock(Clock.class);
 
-        Backoff backoff = new Backoff(
-            100, TimeUnit.MILLISECONDS,
-            60, TimeUnit.SECONDS,
-            1900, TimeUnit.MILLISECONDS,
-            mockClock
-        );
+        Backoff backoff = Backoff.builder()
+            .initialDelay(Duration.ofMillis(100))
+            .maxBackoff(Duration.ofSeconds(60))
+            .mandatoryStop(Duration.ofMillis(1900))
+            .clock(mockClock)
+            .build();
 
-        Mockito.when(mockClock.millis()).thenReturn(0L);
+        Mockito.when(mockClock.instant()).thenReturn(Instant.ofEpochMilli(0));
         assertTrue(checkExactAndDecrementTimer(backoff, 100));
-        Mockito.when(mockClock.millis()).thenReturn(100L);
+        
Mockito.when(mockClock.instant()).thenReturn(Instant.ofEpochMilli(100));
         assertTrue(withinTenPercentAndDecrementTimer(backoff, 200));
-        Mockito.when(mockClock.millis()).thenReturn(300L);
+        
Mockito.when(mockClock.instant()).thenReturn(Instant.ofEpochMilli(300));
         assertTrue(withinTenPercentAndDecrementTimer(backoff, 400));
-        Mockito.when(mockClock.millis()).thenReturn(700L);
+        
Mockito.when(mockClock.instant()).thenReturn(Instant.ofEpochMilli(700));
         assertTrue(withinTenPercentAndDecrementTimer(backoff, 800));
-        Mockito.when(mockClock.millis()).thenReturn(1500L);
+        
Mockito.when(mockClock.instant()).thenReturn(Instant.ofEpochMilli(1500));
 
         // would have been 1600 w/o the mandatory stop
         assertTrue(withinTenPercentAndDecrementTimer(backoff, 400));
         assertTrue(backoff.isMandatoryStopMade());
-        Mockito.when(mockClock.millis()).thenReturn(1900L);
+        
Mockito.when(mockClock.instant()).thenReturn(Instant.ofEpochMilli(1900));
         assertTrue(withinTenPercentAndDecrementTimer(backoff, 3200));
-        Mockito.when(mockClock.millis()).thenReturn(3200L);
+        
Mockito.when(mockClock.instant()).thenReturn(Instant.ofEpochMilli(3200));
         assertTrue(withinTenPercentAndDecrementTimer(backoff, 6400));
-        Mockito.when(mockClock.millis()).thenReturn(3200L);
+        
Mockito.when(mockClock.instant()).thenReturn(Instant.ofEpochMilli(3200));
         assertTrue(withinTenPercentAndDecrementTimer(backoff, 12800));
-        Mockito.when(mockClock.millis()).thenReturn(6400L);
+        
Mockito.when(mockClock.instant()).thenReturn(Instant.ofEpochMilli(6400));
         assertTrue(withinTenPercentAndDecrementTimer(backoff, 25600));
-        Mockito.when(mockClock.millis()).thenReturn(12800L);
+        
Mockito.when(mockClock.instant()).thenReturn(Instant.ofEpochMilli(12800));
         assertTrue(withinTenPercentAndDecrementTimer(backoff, 51200));
-        Mockito.when(mockClock.millis()).thenReturn(25600L);
+        
Mockito.when(mockClock.instant()).thenReturn(Instant.ofEpochMilli(25600));
         assertTrue(withinTenPercentAndDecrementTimer(backoff, 60000));
-        Mockito.when(mockClock.millis()).thenReturn(51200L);
+        
Mockito.when(mockClock.instant()).thenReturn(Instant.ofEpochMilli(51200));
         assertTrue(withinTenPercentAndDecrementTimer(backoff, 60000));
-        Mockito.when(mockClock.millis()).thenReturn(60000L);
+        
Mockito.when(mockClock.instant()).thenReturn(Instant.ofEpochMilli(60000));
 
         backoff.reset();
-        Mockito.when(mockClock.millis()).thenReturn(0L);
+        Mockito.when(mockClock.instant()).thenReturn(Instant.ofEpochMilli(0));
         assertTrue(checkExactAndDecrementTimer(backoff, 100));
-        Mockito.when(mockClock.millis()).thenReturn(100L);
+        
Mockito.when(mockClock.instant()).thenReturn(Instant.ofEpochMilli(100));
         assertTrue(withinTenPercentAndDecrementTimer(backoff, 200));
-        Mockito.when(mockClock.millis()).thenReturn(300L);
+        
Mockito.when(mockClock.instant()).thenReturn(Instant.ofEpochMilli(300));
         assertTrue(withinTenPercentAndDecrementTimer(backoff, 400));
-        Mockito.when(mockClock.millis()).thenReturn(700L);
+        
Mockito.when(mockClock.instant()).thenReturn(Instant.ofEpochMilli(700));
         assertTrue(withinTenPercentAndDecrementTimer(backoff, 800));
-        Mockito.when(mockClock.millis()).thenReturn(1500L);
+        
Mockito.when(mockClock.instant()).thenReturn(Instant.ofEpochMilli(1500));
         // would have been 1600 w/o the mandatory stop
         assertTrue(withinTenPercentAndDecrementTimer(backoff, 400));
 
         backoff.reset();
-        Mockito.when(mockClock.millis()).thenReturn(0L);
+        Mockito.when(mockClock.instant()).thenReturn(Instant.ofEpochMilli(0));
         assertTrue(checkExactAndDecrementTimer(backoff, 100));
-        Mockito.when(mockClock.millis()).thenReturn(100L);
+        
Mockito.when(mockClock.instant()).thenReturn(Instant.ofEpochMilli(100));
         assertTrue(withinTenPercentAndDecrementTimer(backoff, 200));
-        Mockito.when(mockClock.millis()).thenReturn(300L);
+        
Mockito.when(mockClock.instant()).thenReturn(Instant.ofEpochMilli(300));
         assertTrue(withinTenPercentAndDecrementTimer(backoff, 400));
-        Mockito.when(mockClock.millis()).thenReturn(700L);
+        
Mockito.when(mockClock.instant()).thenReturn(Instant.ofEpochMilli(700));
         assertTrue(withinTenPercentAndDecrementTimer(backoff, 800));
 
         backoff.reset();
-        Mockito.when(mockClock.millis()).thenReturn(0L);
+        Mockito.when(mockClock.instant()).thenReturn(Instant.ofEpochMilli(0));
         assertTrue(checkExactAndDecrementTimer(backoff, 100));
-        Mockito.when(mockClock.millis()).thenReturn(100L);
+        
Mockito.when(mockClock.instant()).thenReturn(Instant.ofEpochMilli(100));
         assertTrue(withinTenPercentAndDecrementTimer(backoff, 200));
-        Mockito.when(mockClock.millis()).thenReturn(300L);
+        
Mockito.when(mockClock.instant()).thenReturn(Instant.ofEpochMilli(300));
         assertTrue(withinTenPercentAndDecrementTimer(backoff, 400));
-        Mockito.when(mockClock.millis()).thenReturn(700L);
+        
Mockito.when(mockClock.instant()).thenReturn(Instant.ofEpochMilli(700));
         assertTrue(withinTenPercentAndDecrementTimer(backoff, 800));
     }
 
diff --git 
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/MetadataCacheConfig.java
 
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/MetadataCacheConfig.java
index e5334038290..e797850f4f6 100644
--- 
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/MetadataCacheConfig.java
+++ 
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/MetadataCacheConfig.java
@@ -18,13 +18,13 @@
  */
 package org.apache.pulsar.metadata.api;
 
+import java.time.Duration;
 import java.util.Optional;
-import java.util.concurrent.TimeUnit;
 import java.util.function.BiConsumer;
 import lombok.Builder;
 import lombok.Getter;
 import lombok.ToString;
-import org.apache.pulsar.common.util.BackoffBuilder;
+import org.apache.pulsar.common.util.Backoff;
 
 /**
  * The configuration builder for a {@link MetadataCache} config.
@@ -33,16 +33,16 @@ import org.apache.pulsar.common.util.BackoffBuilder;
 @Getter
 @ToString
 public class MetadataCacheConfig<T> {
-    private static final long DEFAULT_CACHE_REFRESH_TIME_MILLIS = 
TimeUnit.MINUTES.toMillis(5);
-    public static final BackoffBuilder DEFAULT_RETRY_BACKOFF_BUILDER =
-            new BackoffBuilder().setInitialTime(5, TimeUnit.MILLISECONDS)
-                    .setMax(3, TimeUnit.SECONDS)
-                    .setMandatoryStop(30, TimeUnit.SECONDS);
+    private static final long DEFAULT_CACHE_REFRESH_TIME_MILLIS = 
Duration.ofMinutes(5).toMillis();
+    public static final Backoff.Builder DEFAULT_RETRY_BACKOFF_BUILDER =
+            Backoff.builder().initialDelay(Duration.ofMillis(5))
+                    .maxBackoff(Duration.ofSeconds(3))
+                    .mandatoryStop(Duration.ofSeconds(30));
 
-    public static final BackoffBuilder NO_RETRY_BACKOFF_BUILDER =
-            new BackoffBuilder().setInitialTime(0, TimeUnit.MILLISECONDS)
-                    .setMax(0, TimeUnit.SECONDS)
-                    .setMandatoryStop(0, TimeUnit.SECONDS);
+    public static final Backoff.Builder NO_RETRY_BACKOFF_BUILDER =
+            Backoff.builder().initialDelay(Duration.ZERO)
+                    .maxBackoff(Duration.ZERO)
+                    .mandatoryStop(Duration.ZERO);
 
     /**
      * Specifies that active entries are eligible for automatic refresh once a 
fixed duration has
@@ -68,6 +68,6 @@ public class MetadataCacheConfig<T> {
     private final BiConsumer<String, Optional<CacheGetResult<T>>> 
asyncReloadConsumer = null;
 
     @Builder.Default
-    private final BackoffBuilder retryBackoff = DEFAULT_RETRY_BACKOFF_BUILDER;
+    private final Backoff.Builder retryBackoff = DEFAULT_RETRY_BACKOFF_BUILDER;
 
 }
diff --git 
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/cache/impl/MetadataCacheImpl.java
 
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/cache/impl/MetadataCacheImpl.java
index 25890852775..e27f9338f54 100644
--- 
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/cache/impl/MetadataCacheImpl.java
+++ 
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/cache/impl/MetadataCacheImpl.java
@@ -25,6 +25,8 @@ import com.github.benmanes.caffeine.cache.AsyncLoadingCache;
 import com.github.benmanes.caffeine.cache.Caffeine;
 import com.google.common.annotations.VisibleForTesting;
 import java.io.IOException;
+import java.time.Duration;
+import java.time.Instant;
 import java.util.EnumSet;
 import java.util.List;
 import java.util.Optional;
@@ -373,9 +375,9 @@ public class MetadataCacheImpl<T> implements 
MetadataCache<T>, Consumer<Notifica
                 // if resource is updated by other than metadata-cache then 
metadata-cache will get bad-version
                 // exception. so, try to invalidate the cache and try one more 
time.
                 objCache.synchronous().invalidate(key);
-                long elapsed = System.currentTimeMillis() - 
backoff.getFirstBackoffTimeInMillis();
+                long elapsed = Duration.between(backoff.getFirstBackoffTime(), 
Instant.now()).toMillis();
                 if (backoff.isMandatoryStopMade()) {
-                    if (backoff.getFirstBackoffTimeInMillis() == 0) {
+                    if (Instant.EPOCH.equals(backoff.getFirstBackoffTime())) {
                         result.completeExceptionally(ex.getCause());
                     } else {
                         result.completeExceptionally(new TimeoutException(
@@ -383,10 +385,10 @@ public class MetadataCacheImpl<T> implements 
MetadataCache<T>, Consumer<Notifica
                     }
                     return null;
                 }
-                final var next = backoff.next();
+                final long nextMs = backoff.next().toMillis();
                 log.info("Update key {} conflicts. Retrying in {} ms. 
Mandatory stop: {}. Elapsed time: {} ms", key,
-                        next, backoff.isMandatoryStopMade(), elapsed);
-                schedulerExecutor.schedule(() -> execute(op, key, result, 
backoff), next, TimeUnit.MILLISECONDS);
+                        nextMs, backoff.isMandatoryStopMade(), elapsed);
+                schedulerExecutor.schedule(() -> execute(op, key, result, 
backoff), nextMs, TimeUnit.MILLISECONDS);
                 return null;
             }
             result.completeExceptionally(ex.getCause());
@@ -395,7 +397,7 @@ public class MetadataCacheImpl<T> implements 
MetadataCache<T>, Consumer<Notifica
     }
 
     private CompletableFuture<T> 
executeWithRetry(Supplier<CompletableFuture<T>> op, String key) {
-        final var backoff = cacheConfig.getRetryBackoff().create();
+        final var backoff = cacheConfig.getRetryBackoff().build();
         CompletableFuture<T> result = new CompletableFuture<>();
         execute(op, key, result, backoff);
         return result;
diff --git 
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/coordination/impl/ResourceLockImpl.java
 
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/coordination/impl/ResourceLockImpl.java
index 692f224594c..2fe766a706a 100644
--- 
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/coordination/impl/ResourceLockImpl.java
+++ 
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/coordination/impl/ResourceLockImpl.java
@@ -27,7 +27,6 @@ import java.util.concurrent.TimeUnit;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.bookkeeper.common.concurrent.FutureUtils;
 import org.apache.pulsar.common.util.Backoff;
-import org.apache.pulsar.common.util.BackoffBuilder;
 import org.apache.pulsar.common.util.FutureUtil;
 import org.apache.pulsar.metadata.api.GetResult;
 import org.apache.pulsar.metadata.api.MetadataSerde;
@@ -73,10 +72,7 @@ public class ResourceLockImpl<T> implements ResourceLock<T> {
         this.sequencer = FutureUtil.Sequencer.create();
         this.state = State.Init;
         this.executor = executor;
-        this.backoff = new BackoffBuilder()
-                .setInitialTime(100, TimeUnit.MILLISECONDS)
-                .setMax(60, TimeUnit.SECONDS)
-                .create();
+        this.backoff = Backoff.create();
     }
 
     @Override
@@ -251,7 +247,7 @@ public class ResourceLockImpl<T> implements ResourceLock<T> 
{
                             // on Reconnected or SessionReestablished events.
                             revalidateAfterReconnection = true;
 
-                            long delayMillis = backoff.next();
+                            long delayMillis = backoff.next().toMillis();
                             log.warn("Failed to revalidate the lock at {}: {} 
- Retrying in {} seconds", path,
                                     realCause.getMessage(), delayMillis / 
1000.0);
                             revalidateTask =
diff --git 
a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/MetadataCacheTest.java
 
b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/MetadataCacheTest.java
index d188b63ba15..69e23775369 100644
--- 
a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/MetadataCacheTest.java
+++ 
b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/MetadataCacheTest.java
@@ -34,6 +34,7 @@ import java.io.IOException;
 import java.lang.reflect.Field;
 import java.nio.charset.StandardCharsets;
 import java.time.Duration;
+import java.time.Instant;
 import java.util.ArrayList;
 import java.util.EnumSet;
 import java.util.List;
@@ -52,7 +53,7 @@ import lombok.Data;
 import lombok.NoArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.pulsar.common.policies.data.Policies;
-import org.apache.pulsar.common.util.BackoffBuilder;
+import org.apache.pulsar.common.util.Backoff;
 import org.apache.pulsar.common.util.FutureUtil;
 import org.apache.pulsar.common.util.ObjectMapperFactory;
 import org.apache.pulsar.metadata.api.CacheGetResult;
@@ -524,10 +525,10 @@ public class MetadataCacheTest extends 
BaseMetadataStoreTest {
         MetadataStore store = MetadataStoreFactory.create(urlSupplier.get(), 
MetadataStoreConfig.builder().build());
 
         MetadataCache<MyClass> cache = store.getMetadataCache(MyClass.class, 
MetadataCacheConfig.builder()
-                .retryBackoff(new BackoffBuilder()
-                        .setInitialTime(5, TimeUnit.MILLISECONDS)
-                        .setMax(1, TimeUnit.SECONDS)
-                        .setMandatoryStop(3, TimeUnit.SECONDS)).build());
+                .retryBackoff(Backoff.builder()
+                        .initialDelay(Duration.ofMillis(5))
+                        .maxBackoff(Duration.ofSeconds(1))
+                        .mandatoryStop(Duration.ofSeconds(3))).build());
 
         MetadataCache<MyClass> cacheRef = cache;
         if (cache instanceof DualMetadataCache dc) {
@@ -704,10 +705,10 @@ public class MetadataCacheTest extends 
BaseMetadataStoreTest {
         final var config = MetadataCacheConfig.builder().build();
         assertEquals(config.getRefreshAfterWriteMillis(), 
TimeUnit.MINUTES.toMillis(5));
         assertEquals(config.getExpireAfterWriteMillis(), 
TimeUnit.MINUTES.toMillis(10));
-        final var backoff = config.getRetryBackoff().create();
-        assertEquals(backoff.getInitial(), 5);
-        assertEquals(backoff.getMax(), 3000);
-        assertEquals(backoff.getMandatoryStop(), 30_000);
+        final var backoff = config.getRetryBackoff().build();
+        assertEquals(backoff.getInitial(), Duration.ofMillis(5));
+        assertEquals(backoff.getMax(), Duration.ofSeconds(3));
+        assertEquals(backoff.getMandatoryStop(), Duration.ofSeconds(30));
     }
 
     @Test
@@ -715,36 +716,36 @@ public class MetadataCacheTest extends 
BaseMetadataStoreTest {
         final var config = MetadataCacheConfig.builder().retryBackoff(
                 MetadataCacheConfig.NO_RETRY_BACKOFF_BUILDER).build();
 
-        final var backoff = config.getRetryBackoff().create();
+        final var backoff = config.getRetryBackoff().build();
 
-        assertEquals(backoff.getInitial(), 0);
-        assertEquals(backoff.getMax(), 0);
-        assertEquals(backoff.getMandatoryStop(), 0);
+        assertEquals(backoff.getInitial(), Duration.ZERO);
+        assertEquals(backoff.getMax(), Duration.ZERO);
+        assertEquals(backoff.getMandatoryStop(), Duration.ZERO);
         assertTrue(backoff.isMandatoryStopMade());
-        assertEquals(backoff.getFirstBackoffTimeInMillis(), 0);
-        assertEquals(backoff.next(), 0);
-        assertEquals(backoff.next(), 0);
-        assertEquals(backoff.next(), 0);
+        assertEquals(backoff.getFirstBackoffTime(), Instant.EPOCH);
+        assertEquals(backoff.next(), Duration.ZERO);
+        assertEquals(backoff.next(), Duration.ZERO);
+        assertEquals(backoff.next(), Duration.ZERO);
         assertTrue(backoff.isMandatoryStopMade());
-        assertEquals(backoff.getFirstBackoffTimeInMillis(), 0);
+        assertEquals(backoff.getFirstBackoffTime(), Instant.EPOCH);
 
         backoff.reduceToHalf();
         assertTrue(backoff.isMandatoryStopMade());
-        assertEquals(backoff.getFirstBackoffTimeInMillis(), 0);
-        assertEquals(backoff.next(), 0);
-        assertEquals(backoff.next(), 0);
-        assertEquals(backoff.next(), 0);
+        assertEquals(backoff.getFirstBackoffTime(), Instant.EPOCH);
+        assertEquals(backoff.next(), Duration.ZERO);
+        assertEquals(backoff.next(), Duration.ZERO);
+        assertEquals(backoff.next(), Duration.ZERO);
         assertTrue(backoff.isMandatoryStopMade());
-        assertEquals(backoff.getFirstBackoffTimeInMillis(), 0);
+        assertEquals(backoff.getFirstBackoffTime(), Instant.EPOCH);
 
         backoff.reset();
         assertTrue(backoff.isMandatoryStopMade());
-        assertEquals(backoff.getFirstBackoffTimeInMillis(), 0);
-        assertEquals(backoff.next(), 0);
-        assertEquals(backoff.next(), 0);
-        assertEquals(backoff.next(), 0);
+        assertEquals(backoff.getFirstBackoffTime(), Instant.EPOCH);
+        assertEquals(backoff.next(), Duration.ZERO);
+        assertEquals(backoff.next(), Duration.ZERO);
+        assertEquals(backoff.next(), Duration.ZERO);
         assertTrue(backoff.isMandatoryStopMade());
-        assertEquals(backoff.getFirstBackoffTimeInMillis(), 0);
+        assertEquals(backoff.getFirstBackoffTime(), Instant.EPOCH);
     }
 
     @Test

Reply via email to