This is an automated email from the ASF dual-hosted git repository.

aaronai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-clients.git


The following commit(s) were added to refs/heads/master by this push:
     new 7830691  Add more unit test (#146)
7830691 is described below

commit 783069154f4e39c9a96118b3e147643a57ef9b75
Author: Aaron Ai <[email protected]>
AuthorDate: Tue Aug 9 15:33:01 2022 +0800

    Add more unit test (#146)
---
 .../rocketmq/client/java/impl/ClientImpl.java      |   2 +-
 .../client/java/impl/producer/ProducerImpl.java    |  67 ++++----
 .../java/impl/producer/ProducerSettings.java       |   2 +-
 .../client/java/impl/producer/SendReceiptImpl.java |   2 +-
 .../client/java/message/PublishingMessageImpl.java |   5 +-
 .../java/retry/CustomizedBackoffRetryPolicy.java   |  10 +-
 .../java/retry/ExponentialBackoffRetryPolicy.java  |  18 +-
 .../rocketmq/client/java/retry/RetryPolicy.java    |   5 +-
 .../java/impl/producer/ProducerImplTest.java       | 182 ++++++---------------
 .../retry/CustomizedBackoffRetryPolicyTest.java    | 101 +++++++++++-
 .../retry/ExponentialBackoffRetryPolicyTest.java   | 152 +++++++++++++++++
 .../apache/rocketmq/client/java/tool/TestBase.java |   6 +-
 java/pom.xml                                       |   3 +
 13 files changed, 372 insertions(+), 183 deletions(-)

diff --git 
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientImpl.java
 
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientImpl.java
index d67ae57..0a3257a 100644
--- 
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientImpl.java
+++ 
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientImpl.java
@@ -621,7 +621,7 @@ public abstract class ClientImpl extends 
AbstractIdleService implements Client,
         return future;
     }
 
-    ListenableFuture<TopicRouteData> fetchTopicRoute0(final String topic) {
+    protected ListenableFuture<TopicRouteData> fetchTopicRoute0(final String 
topic) {
         try {
             Resource topicResource = 
Resource.newBuilder().setName(topic).build();
             final QueryRouteRequest request = 
QueryRouteRequest.newBuilder().setTopic(topicResource)
diff --git 
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/ProducerImpl.java
 
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/ProducerImpl.java
index c40cf9c..59a2bd9 100644
--- 
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/ProducerImpl.java
+++ 
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/ProducerImpl.java
@@ -88,9 +88,8 @@ class ProducerImpl extends ClientImpl implements Producer {
     private static final Logger LOGGER = 
LoggerFactory.getLogger(ProducerImpl.class);
 
     protected final ProducerSettings producerSettings;
-
+    final ConcurrentMap<String/* topic */, PublishingLoadBalancer> 
publishingRouteDataCache;
     private final TransactionChecker checker;
-    private final ConcurrentMap<String/* topic */, PublishingLoadBalancer> 
publishingRouteDataCache;
 
     /**
      * The caller is supposed to have validated the arguments and handled 
throwing exception or
@@ -197,7 +196,7 @@ class ProducerImpl extends ClientImpl implements Producer {
      */
     @Override
     public SendReceipt send(Message message) throws ClientException {
-        final ListenableFuture<SendReceipt> future = 
Futures.transform(send0(Collections.singletonList(message), false),
+        final ListenableFuture<SendReceipt> future = 
Futures.transform(send(Collections.singletonList(message), false),
             sendReceipts -> sendReceipts.iterator().next(), 
MoreExecutors.directExecutor());
         return handleClientFuture(future);
     }
@@ -217,7 +216,7 @@ class ProducerImpl extends ClientImpl implements Producer {
         } catch (Throwable t) {
             throw new ClientException(t);
         }
-        final ListenableFuture<List<SendReceiptImpl>> future = 
send0(Collections.singletonList(publishingMessage),
+        final ListenableFuture<List<SendReceiptImpl>> future = 
send(Collections.singletonList(publishingMessage),
             true);
         final List<SendReceiptImpl> receipts = handleClientFuture(future);
         final SendReceiptImpl sendReceipt = receipts.iterator().next();
@@ -230,7 +229,7 @@ class ProducerImpl extends ClientImpl implements Producer {
      */
     @Override
     public CompletableFuture<SendReceipt> sendAsync(Message message) {
-        final ListenableFuture<SendReceipt> future = 
Futures.transform(send0(Collections.singletonList(message), false),
+        final ListenableFuture<SendReceipt> future = 
Futures.transform(send(Collections.singletonList(message), false),
             sendReceipts -> sendReceipts.iterator().next(), 
MoreExecutors.directExecutor());
         return FutureConverter.toCompletableFuture(future);
     }
@@ -329,7 +328,7 @@ class ProducerImpl extends ClientImpl implements Producer {
         return result.takeMessageQueues(isolated, 
this.getRetryPolicy().getMaxAttempts());
     }
 
-    private ListenableFuture<List<SendReceiptImpl>> send0(List<Message> 
messages, boolean txEnabled) {
+    private ListenableFuture<List<SendReceiptImpl>> send(List<Message> 
messages, boolean txEnabled) {
         SettableFuture<List<SendReceiptImpl>> future = SettableFuture.create();
 
         // Check producer state before message publishing.
@@ -419,52 +418,52 @@ class ProducerImpl extends ClientImpl implements Producer 
{
     /**
      * The caller is supposed to make sure different messages have the same 
message type and same topic.
      */
-    private SendMessageRequest 
wrapSendMessageRequest(List<PublishingMessageImpl> messages) {
-        return SendMessageRequest.newBuilder()
-            
.addAllMessages(messages.stream().map(PublishingMessageImpl::toProtobuf).collect(Collectors.toList()))
-            .build();
+    private SendMessageRequest 
wrapSendMessageRequest(List<PublishingMessageImpl> pubMessages, 
MessageQueueImpl mq) {
+        final List<apache.rocketmq.v2.Message> messages = pubMessages.stream()
+            .map(publishingMessage -> 
publishingMessage.toProtobuf(mq)).collect(Collectors.toList());
+        return 
SendMessageRequest.newBuilder().addAllMessages(messages).build();
     }
 
-    private void send0(SettableFuture<List<SendReceiptImpl>> future, String 
topic, MessageType messageType,
+    ListenableFuture<List<SendReceiptImpl>> send0(Metadata metadata, Endpoints 
endpoints,
+        List<PublishingMessageImpl> pubMessages, MessageQueueImpl mq) {
+        final SendMessageRequest request = wrapSendMessageRequest(pubMessages, 
mq);
+        final ListenableFuture<RpcInvocation<SendMessageResponse>> future0 =
+            clientManager.sendMessage(endpoints, metadata, request, 
clientConfiguration.getRequestTimeout());
+        return Futures.transformAsync(future0,
+            invocation -> 
Futures.immediateFuture(SendReceiptImpl.processResponseInvocation(mq, 
invocation)),
+            MoreExecutors.directExecutor());
+    }
+
+    private void send0(SettableFuture<List<SendReceiptImpl>> future0, String 
topic, MessageType messageType,
         final List<MessageQueueImpl> candidates, final 
List<PublishingMessageImpl> messages, final int attempt) {
         Metadata metadata;
         try {
             metadata = sign();
         } catch (Throwable t) {
             // Failed to sign, no need to proceed.
-            future.setException(t);
+            future0.setException(t);
             return;
         }
         // Calculate the current message queue.
-        final MessageQueueImpl messageQueue = 
candidates.get(IntMath.mod(attempt - 1, candidates.size()));
-        final List<MessageType> acceptMessageTypes = 
messageQueue.getAcceptMessageTypes();
+        final MessageQueueImpl mq = candidates.get(IntMath.mod(attempt - 1, 
candidates.size()));
+        final List<MessageType> acceptMessageTypes = 
mq.getAcceptMessageTypes();
         if (producerSettings.isValidateMessageType() && 
!acceptMessageTypes.contains(messageType)) {
             final IllegalArgumentException e = new 
IllegalArgumentException("Current message type not match with "
                 + "topic accept message types, topic=" + topic + ", 
actualMessageType=" + messageType + ", "
                 + "acceptMessageTypes=" + acceptMessageTypes);
-            future.setException(e);
+            future0.setException(e);
             return;
         }
-        final Endpoints endpoints = messageQueue.getBroker().getEndpoints();
-        final SendMessageRequest request = wrapSendMessageRequest(messages);
-
-        final ListenableFuture<RpcInvocation<SendMessageResponse>> 
responseFuture =
-            clientManager.sendMessage(endpoints, metadata, request, 
clientConfiguration.getRequestTimeout());
-
-        final ListenableFuture<List<SendReceiptImpl>> attemptFuture = 
Futures.transformAsync(responseFuture,
-            invocation -> 
Futures.immediateFuture(SendReceiptImpl.processSendMessageResponseInvocation(messageQueue,
-                invocation)),
-            MoreExecutors.directExecutor());
-
+        final Endpoints endpoints = mq.getBroker().getEndpoints();
+        final ListenableFuture<List<SendReceiptImpl>> future = send0(metadata, 
endpoints, messages, mq);
         final int maxAttempts = this.getRetryPolicy().getMaxAttempts();
-
         // Intercept before message publishing.
         final Stopwatch stopwatch = Stopwatch.createStarted();
         final List<MessageCommon> messageCommons =
             
messages.stream().map(PublishingMessageImpl::getMessageCommon).collect(Collectors.toList());
         doBefore(MessageHookPoints.SEND, messageCommons);
 
-        Futures.addCallback(attemptFuture, new 
FutureCallback<List<SendReceiptImpl>>() {
+        Futures.addCallback(future, new 
FutureCallback<List<SendReceiptImpl>>() {
             @Override
             public void onSuccess(List<SendReceiptImpl> sendReceipts) {
                 // Intercept after message publishing.
@@ -475,11 +474,11 @@ class ProducerImpl extends ClientImpl implements Producer 
{
                     final InternalErrorException e = new 
InternalErrorException("[Bug] due to an"
                         + " unknown reason from remote, received send 
receipt's quantity " + sendReceipts.size()
                         + " is not equal to sent message's quantity " + 
messages.size());
-                    future.setException(e);
+                    future0.setException(e);
                     return;
                 }
                 // No need more attempts.
-                future.set(sendReceipts);
+                future0.set(sendReceipts);
                 // Resend message(s) successfully.
                 if (1 < attempt) {
                     // Collect messageId(s) for logging.
@@ -509,7 +508,7 @@ class ProducerImpl extends ClientImpl implements Producer {
                 isolate(endpoints);
                 if (attempt >= maxAttempts) {
                     // No need more attempts.
-                    future.setException(t);
+                    future0.setException(t);
                     LOGGER.error("Failed to send message(s) finally, run out 
of attempt times, maxAttempts={}, " +
                             "attempt={}, topic={}, messageId(s)={}, 
endpoints={}, clientId={}",
                         maxAttempts, attempt, topic, messageIds, endpoints, 
clientId, t);
@@ -517,7 +516,7 @@ class ProducerImpl extends ClientImpl implements Producer {
                 }
                 // No need more attempts for transactional message.
                 if (MessageType.TRANSACTION.equals(messageType)) {
-                    future.setException(t);
+                    future0.setException(t);
                     LOGGER.error("Failed to send transactional message 
finally, maxAttempts=1, attempt={}, " +
                             "topic={}, messageId(s)={}, endpoints={}, 
clientId={}", attempt, topic, messageIds,
                         endpoints, clientId, t);
@@ -530,14 +529,14 @@ class ProducerImpl extends ClientImpl implements Producer 
{
                     LOGGER.warn("Failed to send message, would attempt to 
resend right now, maxAttempts={}, "
                             + "attempt={}, topic={}, messageId(s)={}, 
endpoints={}, clientId={}", maxAttempts, attempt,
                         topic, messageIds, endpoints, clientId, t);
-                    send0(future, topic, messageType, candidates, messages, 
nextAttempt);
+                    send0(future0, topic, messageType, candidates, messages, 
nextAttempt);
                     return;
                 }
                 final Duration delay = 
ProducerImpl.this.getRetryPolicy().getNextAttemptDelay(nextAttempt);
                 LOGGER.warn("Failed to send message due to too many requests, 
would attempt to resend after {}, "
                         + "maxAttempts={}, attempt={}, topic={}, 
messageId(s)={}, endpoints={}, clientId={}", delay,
                     maxAttempts, attempt, topic, messageIds, endpoints, 
clientId, t);
-                clientManager.getScheduler().schedule(() -> send0(future, 
topic, messageType, candidates, messages,
+                clientManager.getScheduler().schedule(() -> send0(future0, 
topic, messageType, candidates, messages,
                     nextAttempt), delay.toNanos(), TimeUnit.NANOSECONDS);
             }
         }, clientCallbackExecutor);
diff --git 
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/ProducerSettings.java
 
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/ProducerSettings.java
index 687fb96..4919927 100644
--- 
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/ProducerSettings.java
+++ 
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/ProducerSettings.java
@@ -81,7 +81,7 @@ public class ProducerSettings extends ClientSettings {
         final apache.rocketmq.v2.RetryPolicy backoffPolicy = 
settings.getBackoffPolicy();
         final Publishing publishing = settings.getPublishing();
         RetryPolicy exist = retryPolicy;
-        this.retryPolicy = exist.updateBackoff(backoffPolicy);
+        this.retryPolicy = exist.inheritBackoff(backoffPolicy);
         this.validateMessageType = 
settings.getPublishing().getValidateMessageType();
         this.maxBodySizeBytes = publishing.getMaxBodySize();
         this.arrivedFuture.setFuture(Futures.immediateVoidFuture());
diff --git 
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/SendReceiptImpl.java
 
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/SendReceiptImpl.java
index 2ed3cc7..114e789 100644
--- 
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/SendReceiptImpl.java
+++ 
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/SendReceiptImpl.java
@@ -70,7 +70,7 @@ public class SendReceiptImpl implements SendReceipt {
         return offset;
     }
 
-    public static List<SendReceiptImpl> 
processSendMessageResponseInvocation(MessageQueueImpl mq,
+    public static List<SendReceiptImpl> 
processResponseInvocation(MessageQueueImpl mq,
         RpcInvocation<SendMessageResponse> invocation) throws ClientException {
         final SendMessageResponse response = invocation.getResponse();
         Status status = response.getStatus();
diff --git 
a/java/client/src/main/java/org/apache/rocketmq/client/java/message/PublishingMessageImpl.java
 
b/java/client/src/main/java/org/apache/rocketmq/client/java/message/PublishingMessageImpl.java
index 72afd9a..5edfab1 100644
--- 
a/java/client/src/main/java/org/apache/rocketmq/client/java/message/PublishingMessageImpl.java
+++ 
b/java/client/src/main/java/org/apache/rocketmq/client/java/message/PublishingMessageImpl.java
@@ -28,6 +28,7 @@ import org.apache.rocketmq.client.apis.message.MessageId;
 import org.apache.rocketmq.client.java.impl.producer.ProducerSettings;
 import org.apache.rocketmq.client.java.message.protocol.Encoding;
 import org.apache.rocketmq.client.java.misc.Utilities;
+import org.apache.rocketmq.client.java.route.MessageQueueImpl;
 
 /**
  * This class is a publishing view for message, which could be considered as 
an extension of {@link MessageImpl}.
@@ -97,7 +98,7 @@ public class PublishingMessageImpl extends MessageImpl {
      * <p>This method should be invoked before each message sending, because 
the born time is reset before each
      * invocation, which means that it should not be invoked ahead of time.
      */
-    public apache.rocketmq.v2.Message toProtobuf() {
+    public apache.rocketmq.v2.Message toProtobuf(MessageQueueImpl mq) {
         final apache.rocketmq.v2.SystemProperties.Builder 
systemPropertiesBuilder =
             apache.rocketmq.v2.SystemProperties.newBuilder()
                 // Message keys
@@ -110,6 +111,8 @@ public class PublishingMessageImpl extends MessageImpl {
                 .setBornHost(Utilities.hostName())
                 // Body encoding
                 .setBodyEncoding(Encoding.toProtobuf(Encoding.IDENTITY))
+                // Queue id
+                .setQueueId(mq.getQueueId())
                 // Message type
                 .setMessageType(MessageType.toProtobuf(messageType));
         // Message tag
diff --git 
a/java/client/src/main/java/org/apache/rocketmq/client/java/retry/CustomizedBackoffRetryPolicy.java
 
b/java/client/src/main/java/org/apache/rocketmq/client/java/retry/CustomizedBackoffRetryPolicy.java
index 59b5e4c..a1cf11a 100644
--- 
a/java/client/src/main/java/org/apache/rocketmq/client/java/retry/CustomizedBackoffRetryPolicy.java
+++ 
b/java/client/src/main/java/org/apache/rocketmq/client/java/retry/CustomizedBackoffRetryPolicy.java
@@ -42,6 +42,10 @@ public class CustomizedBackoffRetryPolicy implements 
RetryPolicy {
         return maxAttempts;
     }
 
+    List<Duration> getDurations() {
+        return durations;
+    }
+
     @Override
     public Duration getNextAttemptDelay(int attempt) {
         checkArgument(attempt > 0, "attempt must be positive");
@@ -68,14 +72,14 @@ public class CustomizedBackoffRetryPolicy implements 
RetryPolicy {
     }
 
     @Override
-    public RetryPolicy updateBackoff(apache.rocketmq.v2.RetryPolicy 
retryPolicy) {
+    public RetryPolicy inheritBackoff(apache.rocketmq.v2.RetryPolicy 
retryPolicy) {
         if (!CUSTOMIZED_BACKOFF.equals(retryPolicy.getStrategyCase())) {
             throw new IllegalArgumentException("strategy must be customized 
backoff");
         }
-        return updateBackoff(retryPolicy.getCustomizedBackoff());
+        return inheritBackoff(retryPolicy.getCustomizedBackoff());
     }
 
-    private RetryPolicy updateBackoff(CustomizedBackoff backoff) {
+    private RetryPolicy inheritBackoff(CustomizedBackoff backoff) {
         final List<Duration> durations = backoff.getNextList().stream()
             .map(duration -> Duration.ofNanos(Durations.toNanos(duration)))
             .collect(Collectors.toList());
diff --git 
a/java/client/src/main/java/org/apache/rocketmq/client/java/retry/ExponentialBackoffRetryPolicy.java
 
b/java/client/src/main/java/org/apache/rocketmq/client/java/retry/ExponentialBackoffRetryPolicy.java
index ed82dfe..cd8c256 100644
--- 
a/java/client/src/main/java/org/apache/rocketmq/client/java/retry/ExponentialBackoffRetryPolicy.java
+++ 
b/java/client/src/main/java/org/apache/rocketmq/client/java/retry/ExponentialBackoffRetryPolicy.java
@@ -55,6 +55,18 @@ public class ExponentialBackoffRetryPolicy implements 
RetryPolicy {
         return maxAttempts;
     }
 
+    Duration getInitialBackoff() {
+        return initialBackoff;
+    }
+
+    Duration getMaxBackoff() {
+        return maxBackoff;
+    }
+
+    double getBackoffMultiplier() {
+        return backoffMultiplier;
+    }
+
     @Override
     public Duration getNextAttemptDelay(int attempt) {
         checkArgument(attempt > 0, "attempt must be positive");
@@ -88,14 +100,14 @@ public class ExponentialBackoffRetryPolicy implements 
RetryPolicy {
     }
 
     @Override
-    public RetryPolicy updateBackoff(apache.rocketmq.v2.RetryPolicy 
retryPolicy) {
+    public RetryPolicy inheritBackoff(apache.rocketmq.v2.RetryPolicy 
retryPolicy) {
         if (!EXPONENTIAL_BACKOFF.equals(retryPolicy.getStrategyCase())) {
             throw new IllegalArgumentException("strategy must be exponential 
backoff");
         }
-        return updateBackoff(retryPolicy.getExponentialBackoff());
+        return inheritBackoff(retryPolicy.getExponentialBackoff());
     }
 
-    private RetryPolicy updateBackoff(ExponentialBackoff backoff) {
+    private RetryPolicy inheritBackoff(ExponentialBackoff backoff) {
         return new ExponentialBackoffRetryPolicy(maxAttempts,
             Duration.ofNanos(Durations.toNanos(backoff.getInitial())),
             Duration.ofNanos(Durations.toNanos(backoff.getMax())),
diff --git 
a/java/client/src/main/java/org/apache/rocketmq/client/java/retry/RetryPolicy.java
 
b/java/client/src/main/java/org/apache/rocketmq/client/java/retry/RetryPolicy.java
index 43d9fa2..e6ed336 100644
--- 
a/java/client/src/main/java/org/apache/rocketmq/client/java/retry/RetryPolicy.java
+++ 
b/java/client/src/main/java/org/apache/rocketmq/client/java/retry/RetryPolicy.java
@@ -44,7 +44,10 @@ public interface RetryPolicy {
      * @param retryPolicy retry policy which contains the backoff strategy.
      * @return the new retry policy.
      */
-    RetryPolicy updateBackoff(apache.rocketmq.v2.RetryPolicy retryPolicy);
+    RetryPolicy inheritBackoff(apache.rocketmq.v2.RetryPolicy retryPolicy);
 
+    /**
+     * Convert to {@link apache.rocketmq.v2.RetryPolicy}.
+     */
     apache.rocketmq.v2.RetryPolicy toProtobuf();
 }
diff --git 
a/java/client/src/test/java/org/apache/rocketmq/client/java/impl/producer/ProducerImplTest.java
 
b/java/client/src/test/java/org/apache/rocketmq/client/java/impl/producer/ProducerImplTest.java
index 8b4985c..ef94469 100644
--- 
a/java/client/src/test/java/org/apache/rocketmq/client/java/impl/producer/ProducerImplTest.java
+++ 
b/java/client/src/test/java/org/apache/rocketmq/client/java/impl/producer/ProducerImplTest.java
@@ -17,186 +17,98 @@
 
 package org.apache.rocketmq.client.java.impl.producer;
 
-import static org.junit.Assert.assertEquals;
 import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.Mockito.doNothing;
+import static org.mockito.ArgumentMatchers.anyList;
 import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.never;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
 
 import apache.rocketmq.v2.Broker;
-import apache.rocketmq.v2.Code;
 import apache.rocketmq.v2.MessageQueue;
+import apache.rocketmq.v2.MessageType;
 import apache.rocketmq.v2.Permission;
-import apache.rocketmq.v2.Publishing;
-import apache.rocketmq.v2.QueryRouteRequest;
-import apache.rocketmq.v2.QueryRouteResponse;
 import apache.rocketmq.v2.Resource;
-import apache.rocketmq.v2.SendMessageRequest;
-import apache.rocketmq.v2.SendMessageResponse;
-import apache.rocketmq.v2.Settings;
-import apache.rocketmq.v2.Status;
-import apache.rocketmq.v2.TelemetryCommand;
-import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.Service;
-import com.google.common.util.concurrent.SettableFuture;
 import io.grpc.Metadata;
-import io.grpc.stub.StreamObserver;
-import java.time.Duration;
 import java.util.ArrayList;
-import java.util.Arrays;
+import java.util.Collections;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ScheduledThreadPoolExecutor;
 import org.apache.rocketmq.client.apis.ClientConfiguration;
 import org.apache.rocketmq.client.apis.ClientException;
 import org.apache.rocketmq.client.apis.message.Message;
-import org.apache.rocketmq.client.apis.producer.SendReceipt;
-import org.apache.rocketmq.client.java.impl.ClientManagerImpl;
-import org.apache.rocketmq.client.java.impl.ClientSessionImpl;
-import org.apache.rocketmq.client.java.misc.ThreadFactoryImpl;
 import org.apache.rocketmq.client.java.route.Endpoints;
-import org.apache.rocketmq.client.java.rpc.RpcInvocation;
+import org.apache.rocketmq.client.java.route.MessageQueueImpl;
+import org.apache.rocketmq.client.java.route.TopicRouteData;
 import org.apache.rocketmq.client.java.tool.TestBase;
-import org.junit.Ignore;
 import org.junit.Test;
 import org.junit.runner.RunWith;
-import org.mockito.InjectMocks;
-import org.mockito.Mock;
+import org.mockito.Mockito;
 import org.mockito.junit.MockitoJUnitRunner;
 
 @RunWith(MockitoJUnitRunner.class)
 public class ProducerImplTest extends TestBase {
-    @Mock
-    private ClientManagerImpl clientManager;
-    @Mock
-    private StreamObserver<TelemetryCommand> telemetryRequestObserver;
-
     private final ClientConfiguration clientConfiguration = 
ClientConfiguration.newBuilder()
         .setEndpoints(FAKE_ACCESS_POINT).build();
 
-    private final String[] str = {FAKE_TOPIC_0};
-    private final Set<String> set = new HashSet<>(Arrays.asList(str));
-
-    @InjectMocks
-    private final ProducerImpl producer = new 
ProducerImpl(clientConfiguration, set, 1, null);
-
-    @InjectMocks
-    private final ProducerImpl producerWithoutTopicBinding = new 
ProducerImpl(clientConfiguration, new HashSet<>(), 1,
-        null);
-
-    private void start(ProducerImpl producer) throws ClientException {
-        SettableFuture<RpcInvocation<QueryRouteResponse>> future0 = 
SettableFuture.create();
-        Status status = Status.newBuilder().setCode(Code.OK).build();
+    @SuppressWarnings("SameParameterValue")
+    private ProducerImpl createProducerWithTopic(String topic) {
         List<MessageQueue> messageQueueList = new ArrayList<>();
-        MessageQueue mq = 
MessageQueue.newBuilder().setTopic(Resource.newBuilder().setName(FAKE_TOPIC_0))
+        MessageQueue mq = 
MessageQueue.newBuilder().setTopic(Resource.newBuilder().setName(topic))
             .setPermission(Permission.READ_WRITE)
+            .addAcceptMessageTypes(MessageType.NORMAL)
             
.setBroker(Broker.newBuilder().setName(FAKE_BROKER_NAME_0).setEndpoints(fakePbEndpoints0()))
             .setId(0).build();
         messageQueueList.add(mq);
-        QueryRouteResponse response = 
QueryRouteResponse.newBuilder().setStatus(status)
-            .addAllMessageQueues(messageQueueList).build();
-        final RpcInvocation<QueryRouteResponse> rpcInvocation =
-            new RpcInvocation<>(response, fakeRpcContext());
-        future0.set(rpcInvocation);
-        when(clientManager.queryRoute(any(Endpoints.class), 
any(Metadata.class), any(QueryRouteRequest.class),
-            any(Duration.class)))
-            .thenReturn(future0);
-        when(clientManager.telemetry(any(Endpoints.class), 
any(Metadata.class), any(Duration.class),
-            any(ClientSessionImpl.class)))
-            .thenReturn(telemetryRequestObserver);
-        final ScheduledThreadPoolExecutor scheduler = new 
ScheduledThreadPoolExecutor(1, new ThreadFactoryImpl(
-            "TestScheduler"));
-        when(clientManager.getScheduler()).thenReturn(scheduler);
-        
doNothing().when(telemetryRequestObserver).onNext(any(TelemetryCommand.class));
-
-        int messageMaxBodySize = 1024 * 1024 * 4;
-        Publishing publishing = 
Publishing.newBuilder().setMaxBodySize(messageMaxBodySize).build();
-        Settings settings = 
Settings.newBuilder().setPublishing(publishing).build();
-        final Service service = producer.startAsync();
-        producer.getClientSettings().applySettingsCommand(settings);
-        service.awaitRunning();
-    }
-
-    private void shutdown(ProducerImpl producer) {
-        final Service clientManagerService = mock(Service.class);
-        when(clientManager.stopAsync()).thenReturn(clientManagerService);
-        doNothing().when(clientManagerService).awaitTerminated();
-        producer.stopAsync().awaitTerminated();
+        final TopicRouteData topicRouteData = new 
TopicRouteData(messageQueueList);
+        final PublishingLoadBalancer publishingLoadBalancer = new 
PublishingLoadBalancer(topicRouteData);
+        final Set<String> set = new HashSet<>();
+        set.add(topic);
+        final ProducerImpl producer = Mockito.spy(new 
ProducerImpl(clientConfiguration, set, 1, null));
+        producer.publishingRouteDataCache.put(topic, publishingLoadBalancer);
+        final Service mockedService = mock(Service.class);
+        Mockito.doReturn(mockedService).when(producer).startAsync();
+        Mockito.doReturn(mockedService).when(producer).stopAsync();
+        Mockito.doReturn(true).when(producer).isRunning();
+        producer.startAsync().awaitRunning();
+        return producer;
     }
 
     @Test(expected = IllegalStateException.class)
-    public void testSendWithoutStart() throws ClientException {
+    public void testSendBeforeStartup() throws ClientException {
+        final Set<String> set = Collections.singleton(FAKE_TOPIC_0);
+        final ProducerImpl producer = Mockito.spy(new 
ProducerImpl(clientConfiguration, set, 1, null));
         final Message message = fakeMessage(FAKE_TOPIC_0);
         producer.send(message);
     }
 
     @Test
-    @Ignore
-    public void testSendWithTopicBinding() throws ClientException, 
ExecutionException, InterruptedException {
-        start(producer);
-        verify(clientManager, times(1)).queryRoute(any(Endpoints.class), 
any(Metadata.class),
-            any(QueryRouteRequest.class), any(Duration.class));
-        verify(clientManager, times(1)).telemetry(any(Endpoints.class), 
any(Metadata.class),
-            any(Duration.class), any(ClientSessionImpl.class));
+    public void testSendWithTopic() throws Exception {
+        final ProducerImpl producer = createProducerWithTopic(FAKE_TOPIC_0);
         final Message message = fakeMessage(FAKE_TOPIC_0);
-        final ListenableFuture<RpcInvocation<SendMessageResponse>> future =
-            okSendMessageResponseFutureWithSingleEntry();
-        when(clientManager.sendMessage(any(Endpoints.class), 
any(Metadata.class), any(SendMessageRequest.class),
-            any(Duration.class))).thenReturn(future);
-        final SendMessageResponse response = future.get().getResponse();
-        assertEquals(1, response.getEntriesCount());
-        final apache.rocketmq.v2.SendResultEntry receipt = 
response.getEntriesList().iterator().next();
-        final SendReceipt sendReceipt = producer.send(message);
-        assertEquals(receipt.getMessageId(), 
sendReceipt.getMessageId().toString());
-        shutdown(producer);
+        final MessageQueueImpl messageQueue = 
fakeMessageQueueImpl(FAKE_TOPIC_0);
+        final SendReceiptImpl sendReceiptImpl = 
fakeSendReceiptImpl(messageQueue);
+        
Mockito.doReturn(Futures.immediateFuture(Collections.singletonList(sendReceiptImpl)))
+            .when(producer).send0(any(Metadata.class), any(Endpoints.class), 
anyList(), any(MessageQueueImpl.class));
+        producer.send(message);
+        verify(producer, times(1)).send0(any(Metadata.class), 
any(Endpoints.class), anyList(),
+            any(MessageQueueImpl.class));
+        producer.close();
     }
 
-    @Test
-    @Ignore
-    public void testSendWithoutTopicBinding() throws ClientException, 
ExecutionException, InterruptedException {
-        start(producerWithoutTopicBinding);
-        verify(clientManager, never()).queryRoute(any(Endpoints.class), 
any(Metadata.class),
-            any(QueryRouteRequest.class), any(Duration.class));
-        verify(clientManager, never()).telemetry(any(Endpoints.class), 
any(Metadata.class), any(Duration.class),
-            any(ClientSessionImpl.class));
+    @Test(expected = IllegalArgumentException.class)
+    public void testSendFailureWithTopic() throws ClientException {
+        final ProducerImpl producer = createProducerWithTopic(FAKE_TOPIC_0);
         final Message message = fakeMessage(FAKE_TOPIC_0);
-        final ListenableFuture<RpcInvocation<SendMessageResponse>> future =
-            okSendMessageResponseFutureWithSingleEntry();
-        when(clientManager.sendMessage(any(Endpoints.class), 
any(Metadata.class), any(SendMessageRequest.class),
-            any(Duration.class))).thenReturn(future);
-        final SendMessageResponse response = future.get().getResponse();
-        assertEquals(1, response.getEntriesCount());
-        final SendReceipt sendReceipt = 
producerWithoutTopicBinding.send(message);
-        verify(clientManager, times(1)).queryRoute(any(Endpoints.class), 
any(Metadata.class),
-            any(QueryRouteRequest.class), any(Duration.class));
-        verify(clientManager, times(1)).telemetry(any(Endpoints.class), 
any(Metadata.class),
-            any(Duration.class), any(ClientSessionImpl.class));
-        final apache.rocketmq.v2.SendResultEntry receipt = 
response.getEntriesList().iterator().next();
-        assertEquals(receipt.getMessageId(), 
sendReceipt.getMessageId().toString());
-        shutdown(producerWithoutTopicBinding);
-    }
-
-    @Test(expected = ClientException.class)
-    @Ignore
-    public void testSendMessageWithFailure() throws ClientException {
-        start(producer);
-        verify(clientManager, times(1)).queryRoute(any(Endpoints.class), 
any(Metadata.class),
-            any(QueryRouteRequest.class), any(Duration.class));
-        verify(clientManager, times(1)).telemetry(any(Endpoints.class), 
any(Metadata.class), any(Duration.class),
-            any(ClientSessionImpl.class));
-        final ListenableFuture<RpcInvocation<SendMessageResponse>> future = 
failureSendMessageResponseFuture();
-        when(clientManager.sendMessage(any(Endpoints.class), 
any(Metadata.class), any(SendMessageRequest.class),
-            any(Duration.class))).thenReturn(future);
-        Message message0 = fakeMessage(FAKE_TOPIC_0);
-        try {
-            producer.send(message0);
-        } finally {
-            shutdown(producer);
-        }
+        final Exception exception = new IllegalArgumentException();
+        Mockito.doReturn(Futures.immediateFailedFuture(exception))
+            .when(producer).send0(any(Metadata.class), any(Endpoints.class), 
anyList(), any(MessageQueueImpl.class));
+        producer.send(message);
+        final int maxAttempts = 
producer.producerSettings.getRetryPolicy().getMaxAttempts();
+        verify(producer, times(maxAttempts)).send0(any(Metadata.class), 
any(Endpoints.class), anyList(),
+            any(MessageQueueImpl.class));
+        producer.close();
     }
 }
\ No newline at end of file
diff --git 
a/java/client/src/test/java/org/apache/rocketmq/client/java/retry/CustomizedBackoffRetryPolicyTest.java
 
b/java/client/src/test/java/org/apache/rocketmq/client/java/retry/CustomizedBackoffRetryPolicyTest.java
index 0faaf23..d5a92d4 100644
--- 
a/java/client/src/test/java/org/apache/rocketmq/client/java/retry/CustomizedBackoffRetryPolicyTest.java
+++ 
b/java/client/src/test/java/org/apache/rocketmq/client/java/retry/CustomizedBackoffRetryPolicyTest.java
@@ -21,11 +21,13 @@ import static 
apache.rocketmq.v2.RetryPolicy.StrategyCase.CUSTOMIZED_BACKOFF;
 import static org.junit.Assert.assertEquals;
 
 import apache.rocketmq.v2.CustomizedBackoff;
-import apache.rocketmq.v2.RetryPolicy;
+import apache.rocketmq.v2.ExponentialBackoff;
 import com.google.protobuf.util.Durations;
 import java.time.Duration;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.stream.Collectors;
+import org.junit.Assert;
 import org.junit.Test;
 
 public class CustomizedBackoffRetryPolicyTest {
@@ -46,6 +48,51 @@ public class CustomizedBackoffRetryPolicyTest {
         assertEquals(duration1, policy.getNextAttemptDelay(4));
     }
 
+    @Test(expected = IllegalArgumentException.class)
+    public void testGetNextAttemptDelayWithIllegalAttempt() {
+        int maxAttempt = 3;
+        Duration duration0 = Duration.ofSeconds(1);
+        Duration duration1 = Duration.ofSeconds(2);
+        List<Duration> durations = new ArrayList<>();
+        durations.add(duration0);
+        durations.add(duration1);
+        final CustomizedBackoffRetryPolicy retryPolicy = new 
CustomizedBackoffRetryPolicy(durations, maxAttempt);
+        retryPolicy.getNextAttemptDelay(0);
+    }
+
+    @Test
+    public void testFromProtobuf() {
+        com.google.protobuf.Duration duration0 = Durations.fromSeconds(1);
+        com.google.protobuf.Duration duration1 = Durations.fromSeconds(2);
+        com.google.protobuf.Duration duration2 = Durations.fromSeconds(3);
+        List<com.google.protobuf.Duration> durations = new ArrayList<>();
+        durations.add(duration0);
+        durations.add(duration1);
+        durations.add(duration2);
+        CustomizedBackoff customizedBackoff = 
CustomizedBackoff.newBuilder().addAllNext(durations).build();
+        apache.rocketmq.v2.RetryPolicy retryPolicy =
+            
apache.rocketmq.v2.RetryPolicy.newBuilder().setCustomizedBackoff(customizedBackoff).setMaxAttempts(3)
+                .build();
+        CustomizedBackoffRetryPolicy retryPolicy0 = 
CustomizedBackoffRetryPolicy.fromProtobuf(retryPolicy);
+        final List<Duration> durations0 = durations.stream()
+            .map(duration -> 
Duration.ofNanos(Durations.toNanos(duration))).collect(Collectors.toList());
+        Assert.assertEquals(retryPolicy0.getDurations(), durations0);
+    }
+
+    @Test(expected = IllegalArgumentException.class)
+    public void testFromProtobufWithoutCustomizedBackoff() {
+        com.google.protobuf.Duration initialBackoff0 = 
Durations.fromSeconds(1);
+        com.google.protobuf.Duration maxBackoff0 = Durations.fromSeconds(1);
+        float backoffMultiplier = 1.0f;
+        int maxAttempts = 3;
+        ExponentialBackoff exponentialBackoff = ExponentialBackoff.newBuilder()
+            
.setInitial(initialBackoff0).setMax(maxBackoff0).setMultiplier(backoffMultiplier).build();
+        apache.rocketmq.v2.RetryPolicy retryPolicy = 
apache.rocketmq.v2.RetryPolicy.newBuilder()
+            .setMaxAttempts(maxAttempts)
+            .setExponentialBackoff(exponentialBackoff).build();
+        CustomizedBackoffRetryPolicy.fromProtobuf(retryPolicy);
+    }
+
     @Test
     public void testToProtobuf() {
         int maxAttempt = 3;
@@ -55,7 +102,7 @@ public class CustomizedBackoffRetryPolicyTest {
         durations.add(duration0);
         durations.add(duration1);
         final CustomizedBackoffRetryPolicy policy = new 
CustomizedBackoffRetryPolicy(durations, maxAttempt);
-        final RetryPolicy protobuf = policy.toProtobuf();
+        final apache.rocketmq.v2.RetryPolicy protobuf = policy.toProtobuf();
         assertEquals(maxAttempt, protobuf.getMaxAttempts());
         assertEquals(CUSTOMIZED_BACKOFF, protobuf.getStrategyCase());
         final CustomizedBackoff backoff = protobuf.getCustomizedBackoff();
@@ -65,4 +112,54 @@ public class CustomizedBackoffRetryPolicyTest {
             assertEquals(durations.get(i).toNanos(), 
Durations.toNanos(next.get(i)));
         }
     }
+
+    @Test
+    public void testInheritBackoff() {
+        List<com.google.protobuf.Duration> durations0 = new ArrayList<>();
+        com.google.protobuf.Duration duration0 = Durations.fromSeconds(1);
+        com.google.protobuf.Duration duration1 = Durations.fromSeconds(2);
+        com.google.protobuf.Duration duration2 = Durations.fromSeconds(3);
+        durations0.add(duration0);
+        durations0.add(duration1);
+        durations0.add(duration2);
+        int maxAttempt0 = 5;
+        CustomizedBackoff customizedBackoff = 
CustomizedBackoff.newBuilder().addAllNext(durations0).build();
+        apache.rocketmq.v2.RetryPolicy retryPolicy = 
apache.rocketmq.v2.RetryPolicy.newBuilder()
+            .setCustomizedBackoff(customizedBackoff)
+            .setMaxAttempts(maxAttempt0)
+            .build();
+        int maxAttempt = 3;
+        Duration duration3 = Duration.ofSeconds(3);
+        Duration duration4 = Duration.ofSeconds(2);
+        Duration duration5 = Duration.ofSeconds(1);
+        List<Duration> durations1 = new ArrayList<>();
+        durations1.add(duration3);
+        durations1.add(duration4);
+        durations1.add(duration5);
+        final CustomizedBackoffRetryPolicy retryPolicy0 = new 
CustomizedBackoffRetryPolicy(durations1, maxAttempt);
+        final RetryPolicy retryPolicy1 = 
retryPolicy0.inheritBackoff(retryPolicy);
+        Assert.assertTrue(retryPolicy1 instanceof 
CustomizedBackoffRetryPolicy);
+        CustomizedBackoffRetryPolicy customizedBackoffRetryPolicy = 
(CustomizedBackoffRetryPolicy) retryPolicy1;
+        final List<Duration> durations2 = durations0.stream()
+            .map(duration -> 
Duration.ofNanos(Durations.toNanos(duration))).collect(Collectors.toList());
+        Assert.assertEquals(customizedBackoffRetryPolicy.getDurations(), 
durations2);
+        Assert.assertEquals(customizedBackoffRetryPolicy.getMaxAttempts(), 
maxAttempt);
+    }
+
+    @Test(expected = IllegalArgumentException.class)
+    public void testInheritBackoffWithoutCustomizedBackoff() {
+        int maxAttempt = 3;
+        Duration duration3 = Duration.ofSeconds(3);
+        Duration duration4 = Duration.ofSeconds(2);
+        Duration duration5 = Duration.ofSeconds(1);
+        List<Duration> durations1 = new ArrayList<>();
+        durations1.add(duration3);
+        durations1.add(duration4);
+        durations1.add(duration5);
+        final CustomizedBackoffRetryPolicy retryPolicy0 = new 
CustomizedBackoffRetryPolicy(durations1, maxAttempt);
+        ExponentialBackoff exponentialBackoff = 
ExponentialBackoff.newBuilder().build();
+        apache.rocketmq.v2.RetryPolicy retryPolicy = 
apache.rocketmq.v2.RetryPolicy.newBuilder()
+            .setExponentialBackoff(exponentialBackoff).build();
+        retryPolicy0.inheritBackoff(retryPolicy);
+    }
 }
\ No newline at end of file
diff --git 
a/java/client/src/test/java/org/apache/rocketmq/client/java/retry/ExponentialBackoffRetryPolicyTest.java
 
b/java/client/src/test/java/org/apache/rocketmq/client/java/retry/ExponentialBackoffRetryPolicyTest.java
new file mode 100644
index 0000000..c8b2e5f
--- /dev/null
+++ 
b/java/client/src/test/java/org/apache/rocketmq/client/java/retry/ExponentialBackoffRetryPolicyTest.java
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.client.java.retry;
+
+import apache.rocketmq.v2.CustomizedBackoff;
+import apache.rocketmq.v2.ExponentialBackoff;
+import com.google.protobuf.util.Durations;
+import java.time.Duration;
+import org.apache.rocketmq.client.java.tool.TestBase;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class ExponentialBackoffRetryPolicyTest extends TestBase {
+
+    @Test
+    public void testNextAttemptDelayForImmediatelyRetryPolicy() {
+        final ExponentialBackoffRetryPolicy retryPolicy = 
ExponentialBackoffRetryPolicy.immediatelyRetryPolicy(3);
+        Assert.assertEquals(retryPolicy.getNextAttemptDelay(1), Duration.ZERO);
+        Assert.assertEquals(retryPolicy.getNextAttemptDelay(2), Duration.ZERO);
+        Assert.assertEquals(retryPolicy.getNextAttemptDelay(3), Duration.ZERO);
+        Assert.assertEquals(retryPolicy.getNextAttemptDelay(4), Duration.ZERO);
+    }
+
+    @Test(expected = IllegalArgumentException.class)
+    public void testGetNextAttemptDelayWithIllegalAttempt() {
+        Duration initialBackoff = Duration.ofMillis(5);
+        Duration maxBackoff = Duration.ofSeconds(1);
+        double backoffMultiplier = 5;
+        final ExponentialBackoffRetryPolicy retryPolicy = new 
ExponentialBackoffRetryPolicy(3,
+            initialBackoff, maxBackoff, backoffMultiplier);
+        retryPolicy.getNextAttemptDelay(0);
+    }
+
+    @Test
+    public void testGetNextAttemptDelay() {
+        Duration initialBackoff = Duration.ofMillis(5);
+        Duration maxBackoff = Duration.ofSeconds(1);
+        double backoffMultiplier = 5;
+        final ExponentialBackoffRetryPolicy retryPolicy = new 
ExponentialBackoffRetryPolicy(3,
+            initialBackoff, maxBackoff, backoffMultiplier);
+        Assert.assertEquals(retryPolicy.getNextAttemptDelay(1), 
Duration.ofMillis(5));
+        Assert.assertEquals(retryPolicy.getNextAttemptDelay(2), 
Duration.ofMillis(25));
+        Assert.assertEquals(retryPolicy.getNextAttemptDelay(3), 
Duration.ofMillis(125));
+        Assert.assertEquals(retryPolicy.getNextAttemptDelay(4), 
Duration.ofMillis(625));
+        Assert.assertEquals(retryPolicy.getNextAttemptDelay(5), 
Duration.ofSeconds(1));
+    }
+
+    @Test
+    public void testFromProtobuf() {
+        final Duration initialBackoff = Duration.ofMillis(5);
+        final Duration maxBackoff = Duration.ofSeconds(1);
+        com.google.protobuf.Duration initialBackoff0 = 
Durations.fromNanos(initialBackoff.toNanos());
+        com.google.protobuf.Duration maxBackoff0 = 
Durations.fromNanos(maxBackoff.toNanos());
+        float backoffMultiplier = 5;
+        int maxAttempts = 3;
+        ExponentialBackoff exponentialBackoff = ExponentialBackoff.newBuilder()
+            
.setInitial(initialBackoff0).setMax(maxBackoff0).setMultiplier(backoffMultiplier).build();
+        apache.rocketmq.v2.RetryPolicy retryPolicy = 
apache.rocketmq.v2.RetryPolicy.newBuilder()
+            .setMaxAttempts(maxAttempts)
+            .setExponentialBackoff(exponentialBackoff).build();
+        ExponentialBackoffRetryPolicy exponentialBackoffRetryPolicy =
+            ExponentialBackoffRetryPolicy.fromProtobuf(retryPolicy);
+        Assert.assertEquals(exponentialBackoffRetryPolicy.getMaxAttempts(), 
maxAttempts);
+        Assert.assertEquals(exponentialBackoffRetryPolicy.getInitialBackoff(), 
initialBackoff);
+        Assert.assertEquals(exponentialBackoffRetryPolicy.getMaxBackoff(), 
maxBackoff);
+        
Assert.assertEquals(exponentialBackoffRetryPolicy.getBackoffMultiplier(), 
backoffMultiplier, 0);
+    }
+
+    @Test(expected = IllegalArgumentException.class)
+    public void testFromProtobufWithoutExponentialBackoff() {
+        int maxAttempts = 3;
+        CustomizedBackoff customizedBackoff = 
CustomizedBackoff.newBuilder().addNext(Durations.fromSeconds(1)).build();
+        apache.rocketmq.v2.RetryPolicy retryPolicy = 
apache.rocketmq.v2.RetryPolicy.newBuilder()
+            
.setMaxAttempts(maxAttempts).setCustomizedBackoff(customizedBackoff).build();
+        ExponentialBackoffRetryPolicy.fromProtobuf(retryPolicy);
+    }
+
+    @Test
+    public void testToProtobuf() {
+        Duration initialBackoff = Duration.ofMillis(5);
+        Duration maxBackoff = Duration.ofSeconds(1);
+        double backoffMultiplier = 5;
+        int maxAttempts = 3;
+        final ExponentialBackoffRetryPolicy retryPolicy = new 
ExponentialBackoffRetryPolicy(maxAttempts,
+            initialBackoff, maxBackoff, backoffMultiplier);
+        final apache.rocketmq.v2.RetryPolicy retryPolicy0 = 
retryPolicy.toProtobuf();
+        Assert.assertTrue(retryPolicy0.hasExponentialBackoff());
+        final ExponentialBackoff exponentialBackoff = 
retryPolicy0.getExponentialBackoff();
+        com.google.protobuf.Duration initialBackoff0 = 
Durations.fromNanos(initialBackoff.toNanos());
+        com.google.protobuf.Duration maxBackoff0 = 
Durations.fromNanos(maxBackoff.toNanos());
+        Assert.assertEquals(exponentialBackoff.getInitial(), initialBackoff0);
+        Assert.assertEquals(exponentialBackoff.getMax(), maxBackoff0);
+        Assert.assertEquals(exponentialBackoff.getMultiplier(), 
backoffMultiplier, 0);
+        Assert.assertEquals(retryPolicy0.getMaxAttempts(), maxAttempts);
+    }
+
+    @Test
+    public void testInheritBackoff() {
+        Duration initialBackoff = Duration.ofMillis(5);
+        Duration maxBackoff = Duration.ofSeconds(1);
+        double backoffMultiplier = 5;
+        int maxAttempts = 3;
+        final ExponentialBackoffRetryPolicy retryPolicy = new 
ExponentialBackoffRetryPolicy(maxAttempts,
+            initialBackoff, maxBackoff, backoffMultiplier);
+
+        Duration initialBackoff0 = Duration.ofMillis(10);
+        Duration maxBackoff0 = Duration.ofSeconds(3);
+        double backoffMultiplier0 = 10;
+        ExponentialBackoff exponentialBackoff = ExponentialBackoff.newBuilder()
+            .setInitial(Durations.fromNanos(initialBackoff0.toNanos()))
+            
.setMax(Durations.fromNanos(maxBackoff0.toNanos())).setMultiplier((float) 
backoffMultiplier0).build();
+        apache.rocketmq.v2.RetryPolicy retryPolicy0 = 
apache.rocketmq.v2.RetryPolicy.newBuilder()
+            .setExponentialBackoff(exponentialBackoff).build();
+        final RetryPolicy retryPolicy1 = 
retryPolicy.inheritBackoff(retryPolicy0);
+        Assert.assertTrue(retryPolicy1 instanceof 
ExponentialBackoffRetryPolicy);
+        ExponentialBackoffRetryPolicy exponentialBackoffRetryPolicy = 
(ExponentialBackoffRetryPolicy) retryPolicy1;
+        Assert.assertEquals(exponentialBackoffRetryPolicy.getInitialBackoff(), 
initialBackoff0);
+        Assert.assertEquals(exponentialBackoffRetryPolicy.getMaxBackoff(), 
maxBackoff0);
+        
Assert.assertEquals(exponentialBackoffRetryPolicy.getBackoffMultiplier(), 
backoffMultiplier0, 0);
+    }
+
+    @Test(expected = IllegalArgumentException.class)
+    public void testInheritBackoffWithoutExponentialBackoff() {
+        int maxAttempts = 3;
+        CustomizedBackoff customizedBackoff = 
CustomizedBackoff.newBuilder().addNext(Durations.fromSeconds(1)).build();
+        apache.rocketmq.v2.RetryPolicy retryPolicy = 
apache.rocketmq.v2.RetryPolicy.newBuilder()
+            
.setMaxAttempts(maxAttempts).setCustomizedBackoff(customizedBackoff).build();
+
+        Duration initialBackoff = Duration.ofMillis(5);
+        Duration maxBackoff = Duration.ofSeconds(1);
+        double backoffMultiplier = 5;
+        final ExponentialBackoffRetryPolicy exponentialBackoffRetryPolicy =
+            new ExponentialBackoffRetryPolicy(maxAttempts,
+                initialBackoff, maxBackoff, backoffMultiplier);
+        exponentialBackoffRetryPolicy.inheritBackoff(retryPolicy);
+    }
+}
\ No newline at end of file
diff --git 
a/java/client/src/test/java/org/apache/rocketmq/client/java/tool/TestBase.java 
b/java/client/src/test/java/org/apache/rocketmq/client/java/tool/TestBase.java
index b7de438..cd4cb85 100644
--- 
a/java/client/src/test/java/org/apache/rocketmq/client/java/tool/TestBase.java
+++ 
b/java/client/src/test/java/org/apache/rocketmq/client/java/tool/TestBase.java
@@ -178,6 +178,10 @@ public class TestBase {
             keys, properties, FAKE_HOST_0, 1, 1, mq, FAKE_RECEIPT_HANDLE_0, 
null, 1, corrupted, null);
     }
 
+    protected MessageQueueImpl fakeMessageQueueImpl(String topic) {
+        return new 
MessageQueueImpl(fakePbMessageQueue0(Resource.newBuilder().setName(topic).build()));
+    }
+
     protected MessageQueueImpl fakeMessageQueueImpl0() {
         return new MessageQueueImpl(fakePbMessageQueue0());
     }
@@ -363,7 +367,7 @@ public class TestBase {
         MessageQueueImpl mq) throws ExecutionException, InterruptedException, 
ClientException {
         final ListenableFuture<RpcInvocation<SendMessageResponse>> future =
             okSendMessageResponseFutureWithSingleEntry();
-        final List<SendReceiptImpl> receipts = 
SendReceiptImpl.processSendMessageResponseInvocation(mq, future.get());
+        final List<SendReceiptImpl> receipts = 
SendReceiptImpl.processResponseInvocation(mq, future.get());
         return receipts.iterator().next();
     }
 }
diff --git a/java/pom.xml b/java/pom.xml
index 89b1120..64168d0 100644
--- a/java/pom.xml
+++ b/java/pom.xml
@@ -290,6 +290,9 @@
                     </execution>
                 </executions>
                 <configuration>
+                    <excludes>
+                        <exclude>**/example/**/*.class</exclude>
+                    </excludes>
                     <rules>
                         <rule>
                             <element>CLASS</element>

Reply via email to