This is an automated email from the ASF dual-hosted git repository.
aaronai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-clients.git
The following commit(s) were added to refs/heads/master by this push:
new 7830691 Add more unit test (#146)
7830691 is described below
commit 783069154f4e39c9a96118b3e147643a57ef9b75
Author: Aaron Ai <[email protected]>
AuthorDate: Tue Aug 9 15:33:01 2022 +0800
Add more unit test (#146)
---
.../rocketmq/client/java/impl/ClientImpl.java | 2 +-
.../client/java/impl/producer/ProducerImpl.java | 67 ++++----
.../java/impl/producer/ProducerSettings.java | 2 +-
.../client/java/impl/producer/SendReceiptImpl.java | 2 +-
.../client/java/message/PublishingMessageImpl.java | 5 +-
.../java/retry/CustomizedBackoffRetryPolicy.java | 10 +-
.../java/retry/ExponentialBackoffRetryPolicy.java | 18 +-
.../rocketmq/client/java/retry/RetryPolicy.java | 5 +-
.../java/impl/producer/ProducerImplTest.java | 182 ++++++---------------
.../retry/CustomizedBackoffRetryPolicyTest.java | 101 +++++++++++-
.../retry/ExponentialBackoffRetryPolicyTest.java | 152 +++++++++++++++++
.../apache/rocketmq/client/java/tool/TestBase.java | 6 +-
java/pom.xml | 3 +
13 files changed, 372 insertions(+), 183 deletions(-)
diff --git
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientImpl.java
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientImpl.java
index d67ae57..0a3257a 100644
---
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientImpl.java
+++
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientImpl.java
@@ -621,7 +621,7 @@ public abstract class ClientImpl extends
AbstractIdleService implements Client,
return future;
}
- ListenableFuture<TopicRouteData> fetchTopicRoute0(final String topic) {
+ protected ListenableFuture<TopicRouteData> fetchTopicRoute0(final String
topic) {
try {
Resource topicResource =
Resource.newBuilder().setName(topic).build();
final QueryRouteRequest request =
QueryRouteRequest.newBuilder().setTopic(topicResource)
diff --git
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/ProducerImpl.java
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/ProducerImpl.java
index c40cf9c..59a2bd9 100644
---
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/ProducerImpl.java
+++
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/ProducerImpl.java
@@ -88,9 +88,8 @@ class ProducerImpl extends ClientImpl implements Producer {
private static final Logger LOGGER =
LoggerFactory.getLogger(ProducerImpl.class);
protected final ProducerSettings producerSettings;
-
+ final ConcurrentMap<String/* topic */, PublishingLoadBalancer>
publishingRouteDataCache;
private final TransactionChecker checker;
- private final ConcurrentMap<String/* topic */, PublishingLoadBalancer>
publishingRouteDataCache;
/**
* The caller is supposed to have validated the arguments and handled
throwing exception or
@@ -197,7 +196,7 @@ class ProducerImpl extends ClientImpl implements Producer {
*/
@Override
public SendReceipt send(Message message) throws ClientException {
- final ListenableFuture<SendReceipt> future =
Futures.transform(send0(Collections.singletonList(message), false),
+ final ListenableFuture<SendReceipt> future =
Futures.transform(send(Collections.singletonList(message), false),
sendReceipts -> sendReceipts.iterator().next(),
MoreExecutors.directExecutor());
return handleClientFuture(future);
}
@@ -217,7 +216,7 @@ class ProducerImpl extends ClientImpl implements Producer {
} catch (Throwable t) {
throw new ClientException(t);
}
- final ListenableFuture<List<SendReceiptImpl>> future =
send0(Collections.singletonList(publishingMessage),
+ final ListenableFuture<List<SendReceiptImpl>> future =
send(Collections.singletonList(publishingMessage),
true);
final List<SendReceiptImpl> receipts = handleClientFuture(future);
final SendReceiptImpl sendReceipt = receipts.iterator().next();
@@ -230,7 +229,7 @@ class ProducerImpl extends ClientImpl implements Producer {
*/
@Override
public CompletableFuture<SendReceipt> sendAsync(Message message) {
- final ListenableFuture<SendReceipt> future =
Futures.transform(send0(Collections.singletonList(message), false),
+ final ListenableFuture<SendReceipt> future =
Futures.transform(send(Collections.singletonList(message), false),
sendReceipts -> sendReceipts.iterator().next(),
MoreExecutors.directExecutor());
return FutureConverter.toCompletableFuture(future);
}
@@ -329,7 +328,7 @@ class ProducerImpl extends ClientImpl implements Producer {
return result.takeMessageQueues(isolated,
this.getRetryPolicy().getMaxAttempts());
}
- private ListenableFuture<List<SendReceiptImpl>> send0(List<Message>
messages, boolean txEnabled) {
+ private ListenableFuture<List<SendReceiptImpl>> send(List<Message>
messages, boolean txEnabled) {
SettableFuture<List<SendReceiptImpl>> future = SettableFuture.create();
// Check producer state before message publishing.
@@ -419,52 +418,52 @@ class ProducerImpl extends ClientImpl implements Producer
{
/**
* The caller is supposed to make sure different messages have the same
message type and same topic.
*/
- private SendMessageRequest
wrapSendMessageRequest(List<PublishingMessageImpl> messages) {
- return SendMessageRequest.newBuilder()
-
.addAllMessages(messages.stream().map(PublishingMessageImpl::toProtobuf).collect(Collectors.toList()))
- .build();
+ private SendMessageRequest
wrapSendMessageRequest(List<PublishingMessageImpl> pubMessages,
MessageQueueImpl mq) {
+ final List<apache.rocketmq.v2.Message> messages = pubMessages.stream()
+ .map(publishingMessage ->
publishingMessage.toProtobuf(mq)).collect(Collectors.toList());
+ return
SendMessageRequest.newBuilder().addAllMessages(messages).build();
}
- private void send0(SettableFuture<List<SendReceiptImpl>> future, String
topic, MessageType messageType,
+ ListenableFuture<List<SendReceiptImpl>> send0(Metadata metadata, Endpoints
endpoints,
+ List<PublishingMessageImpl> pubMessages, MessageQueueImpl mq) {
+ final SendMessageRequest request = wrapSendMessageRequest(pubMessages,
mq);
+ final ListenableFuture<RpcInvocation<SendMessageResponse>> future0 =
+ clientManager.sendMessage(endpoints, metadata, request,
clientConfiguration.getRequestTimeout());
+ return Futures.transformAsync(future0,
+ invocation ->
Futures.immediateFuture(SendReceiptImpl.processResponseInvocation(mq,
invocation)),
+ MoreExecutors.directExecutor());
+ }
+
+ private void send0(SettableFuture<List<SendReceiptImpl>> future0, String
topic, MessageType messageType,
final List<MessageQueueImpl> candidates, final
List<PublishingMessageImpl> messages, final int attempt) {
Metadata metadata;
try {
metadata = sign();
} catch (Throwable t) {
// Failed to sign, no need to proceed.
- future.setException(t);
+ future0.setException(t);
return;
}
// Calculate the current message queue.
- final MessageQueueImpl messageQueue =
candidates.get(IntMath.mod(attempt - 1, candidates.size()));
- final List<MessageType> acceptMessageTypes =
messageQueue.getAcceptMessageTypes();
+ final MessageQueueImpl mq = candidates.get(IntMath.mod(attempt - 1,
candidates.size()));
+ final List<MessageType> acceptMessageTypes =
mq.getAcceptMessageTypes();
if (producerSettings.isValidateMessageType() &&
!acceptMessageTypes.contains(messageType)) {
final IllegalArgumentException e = new
IllegalArgumentException("Current message type not match with "
+ "topic accept message types, topic=" + topic + ",
actualMessageType=" + messageType + ", "
+ "acceptMessageTypes=" + acceptMessageTypes);
- future.setException(e);
+ future0.setException(e);
return;
}
- final Endpoints endpoints = messageQueue.getBroker().getEndpoints();
- final SendMessageRequest request = wrapSendMessageRequest(messages);
-
- final ListenableFuture<RpcInvocation<SendMessageResponse>>
responseFuture =
- clientManager.sendMessage(endpoints, metadata, request,
clientConfiguration.getRequestTimeout());
-
- final ListenableFuture<List<SendReceiptImpl>> attemptFuture =
Futures.transformAsync(responseFuture,
- invocation ->
Futures.immediateFuture(SendReceiptImpl.processSendMessageResponseInvocation(messageQueue,
- invocation)),
- MoreExecutors.directExecutor());
-
+ final Endpoints endpoints = mq.getBroker().getEndpoints();
+ final ListenableFuture<List<SendReceiptImpl>> future = send0(metadata,
endpoints, messages, mq);
final int maxAttempts = this.getRetryPolicy().getMaxAttempts();
-
// Intercept before message publishing.
final Stopwatch stopwatch = Stopwatch.createStarted();
final List<MessageCommon> messageCommons =
messages.stream().map(PublishingMessageImpl::getMessageCommon).collect(Collectors.toList());
doBefore(MessageHookPoints.SEND, messageCommons);
- Futures.addCallback(attemptFuture, new
FutureCallback<List<SendReceiptImpl>>() {
+ Futures.addCallback(future, new
FutureCallback<List<SendReceiptImpl>>() {
@Override
public void onSuccess(List<SendReceiptImpl> sendReceipts) {
// Intercept after message publishing.
@@ -475,11 +474,11 @@ class ProducerImpl extends ClientImpl implements Producer
{
final InternalErrorException e = new
InternalErrorException("[Bug] due to an"
+ " unknown reason from remote, received send
receipt's quantity " + sendReceipts.size()
+ " is not equal to sent message's quantity " +
messages.size());
- future.setException(e);
+ future0.setException(e);
return;
}
// No need more attempts.
- future.set(sendReceipts);
+ future0.set(sendReceipts);
// Resend message(s) successfully.
if (1 < attempt) {
// Collect messageId(s) for logging.
@@ -509,7 +508,7 @@ class ProducerImpl extends ClientImpl implements Producer {
isolate(endpoints);
if (attempt >= maxAttempts) {
// No need more attempts.
- future.setException(t);
+ future0.setException(t);
LOGGER.error("Failed to send message(s) finally, run out
of attempt times, maxAttempts={}, " +
"attempt={}, topic={}, messageId(s)={},
endpoints={}, clientId={}",
maxAttempts, attempt, topic, messageIds, endpoints,
clientId, t);
@@ -517,7 +516,7 @@ class ProducerImpl extends ClientImpl implements Producer {
}
// No need more attempts for transactional message.
if (MessageType.TRANSACTION.equals(messageType)) {
- future.setException(t);
+ future0.setException(t);
LOGGER.error("Failed to send transactional message
finally, maxAttempts=1, attempt={}, " +
"topic={}, messageId(s)={}, endpoints={},
clientId={}", attempt, topic, messageIds,
endpoints, clientId, t);
@@ -530,14 +529,14 @@ class ProducerImpl extends ClientImpl implements Producer
{
LOGGER.warn("Failed to send message, would attempt to
resend right now, maxAttempts={}, "
+ "attempt={}, topic={}, messageId(s)={},
endpoints={}, clientId={}", maxAttempts, attempt,
topic, messageIds, endpoints, clientId, t);
- send0(future, topic, messageType, candidates, messages,
nextAttempt);
+ send0(future0, topic, messageType, candidates, messages,
nextAttempt);
return;
}
final Duration delay =
ProducerImpl.this.getRetryPolicy().getNextAttemptDelay(nextAttempt);
LOGGER.warn("Failed to send message due to too many requests,
would attempt to resend after {}, "
+ "maxAttempts={}, attempt={}, topic={},
messageId(s)={}, endpoints={}, clientId={}", delay,
maxAttempts, attempt, topic, messageIds, endpoints,
clientId, t);
- clientManager.getScheduler().schedule(() -> send0(future,
topic, messageType, candidates, messages,
+ clientManager.getScheduler().schedule(() -> send0(future0,
topic, messageType, candidates, messages,
nextAttempt), delay.toNanos(), TimeUnit.NANOSECONDS);
}
}, clientCallbackExecutor);
diff --git
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/ProducerSettings.java
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/ProducerSettings.java
index 687fb96..4919927 100644
---
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/ProducerSettings.java
+++
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/ProducerSettings.java
@@ -81,7 +81,7 @@ public class ProducerSettings extends ClientSettings {
final apache.rocketmq.v2.RetryPolicy backoffPolicy =
settings.getBackoffPolicy();
final Publishing publishing = settings.getPublishing();
RetryPolicy exist = retryPolicy;
- this.retryPolicy = exist.updateBackoff(backoffPolicy);
+ this.retryPolicy = exist.inheritBackoff(backoffPolicy);
this.validateMessageType =
settings.getPublishing().getValidateMessageType();
this.maxBodySizeBytes = publishing.getMaxBodySize();
this.arrivedFuture.setFuture(Futures.immediateVoidFuture());
diff --git
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/SendReceiptImpl.java
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/SendReceiptImpl.java
index 2ed3cc7..114e789 100644
---
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/SendReceiptImpl.java
+++
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/SendReceiptImpl.java
@@ -70,7 +70,7 @@ public class SendReceiptImpl implements SendReceipt {
return offset;
}
- public static List<SendReceiptImpl>
processSendMessageResponseInvocation(MessageQueueImpl mq,
+ public static List<SendReceiptImpl>
processResponseInvocation(MessageQueueImpl mq,
RpcInvocation<SendMessageResponse> invocation) throws ClientException {
final SendMessageResponse response = invocation.getResponse();
Status status = response.getStatus();
diff --git
a/java/client/src/main/java/org/apache/rocketmq/client/java/message/PublishingMessageImpl.java
b/java/client/src/main/java/org/apache/rocketmq/client/java/message/PublishingMessageImpl.java
index 72afd9a..5edfab1 100644
---
a/java/client/src/main/java/org/apache/rocketmq/client/java/message/PublishingMessageImpl.java
+++
b/java/client/src/main/java/org/apache/rocketmq/client/java/message/PublishingMessageImpl.java
@@ -28,6 +28,7 @@ import org.apache.rocketmq.client.apis.message.MessageId;
import org.apache.rocketmq.client.java.impl.producer.ProducerSettings;
import org.apache.rocketmq.client.java.message.protocol.Encoding;
import org.apache.rocketmq.client.java.misc.Utilities;
+import org.apache.rocketmq.client.java.route.MessageQueueImpl;
/**
* This class is a publishing view for message, which could be considered as
an extension of {@link MessageImpl}.
@@ -97,7 +98,7 @@ public class PublishingMessageImpl extends MessageImpl {
* <p>This method should be invoked before each message sending, because
the born time is reset before each
* invocation, which means that it should not be invoked ahead of time.
*/
- public apache.rocketmq.v2.Message toProtobuf() {
+ public apache.rocketmq.v2.Message toProtobuf(MessageQueueImpl mq) {
final apache.rocketmq.v2.SystemProperties.Builder
systemPropertiesBuilder =
apache.rocketmq.v2.SystemProperties.newBuilder()
// Message keys
@@ -110,6 +111,8 @@ public class PublishingMessageImpl extends MessageImpl {
.setBornHost(Utilities.hostName())
// Body encoding
.setBodyEncoding(Encoding.toProtobuf(Encoding.IDENTITY))
+ // Queue id
+ .setQueueId(mq.getQueueId())
// Message type
.setMessageType(MessageType.toProtobuf(messageType));
// Message tag
diff --git
a/java/client/src/main/java/org/apache/rocketmq/client/java/retry/CustomizedBackoffRetryPolicy.java
b/java/client/src/main/java/org/apache/rocketmq/client/java/retry/CustomizedBackoffRetryPolicy.java
index 59b5e4c..a1cf11a 100644
---
a/java/client/src/main/java/org/apache/rocketmq/client/java/retry/CustomizedBackoffRetryPolicy.java
+++
b/java/client/src/main/java/org/apache/rocketmq/client/java/retry/CustomizedBackoffRetryPolicy.java
@@ -42,6 +42,10 @@ public class CustomizedBackoffRetryPolicy implements
RetryPolicy {
return maxAttempts;
}
+ List<Duration> getDurations() {
+ return durations;
+ }
+
@Override
public Duration getNextAttemptDelay(int attempt) {
checkArgument(attempt > 0, "attempt must be positive");
@@ -68,14 +72,14 @@ public class CustomizedBackoffRetryPolicy implements
RetryPolicy {
}
@Override
- public RetryPolicy updateBackoff(apache.rocketmq.v2.RetryPolicy
retryPolicy) {
+ public RetryPolicy inheritBackoff(apache.rocketmq.v2.RetryPolicy
retryPolicy) {
if (!CUSTOMIZED_BACKOFF.equals(retryPolicy.getStrategyCase())) {
throw new IllegalArgumentException("strategy must be customized
backoff");
}
- return updateBackoff(retryPolicy.getCustomizedBackoff());
+ return inheritBackoff(retryPolicy.getCustomizedBackoff());
}
- private RetryPolicy updateBackoff(CustomizedBackoff backoff) {
+ private RetryPolicy inheritBackoff(CustomizedBackoff backoff) {
final List<Duration> durations = backoff.getNextList().stream()
.map(duration -> Duration.ofNanos(Durations.toNanos(duration)))
.collect(Collectors.toList());
diff --git
a/java/client/src/main/java/org/apache/rocketmq/client/java/retry/ExponentialBackoffRetryPolicy.java
b/java/client/src/main/java/org/apache/rocketmq/client/java/retry/ExponentialBackoffRetryPolicy.java
index ed82dfe..cd8c256 100644
---
a/java/client/src/main/java/org/apache/rocketmq/client/java/retry/ExponentialBackoffRetryPolicy.java
+++
b/java/client/src/main/java/org/apache/rocketmq/client/java/retry/ExponentialBackoffRetryPolicy.java
@@ -55,6 +55,18 @@ public class ExponentialBackoffRetryPolicy implements
RetryPolicy {
return maxAttempts;
}
+ Duration getInitialBackoff() {
+ return initialBackoff;
+ }
+
+ Duration getMaxBackoff() {
+ return maxBackoff;
+ }
+
+ double getBackoffMultiplier() {
+ return backoffMultiplier;
+ }
+
@Override
public Duration getNextAttemptDelay(int attempt) {
checkArgument(attempt > 0, "attempt must be positive");
@@ -88,14 +100,14 @@ public class ExponentialBackoffRetryPolicy implements
RetryPolicy {
}
@Override
- public RetryPolicy updateBackoff(apache.rocketmq.v2.RetryPolicy
retryPolicy) {
+ public RetryPolicy inheritBackoff(apache.rocketmq.v2.RetryPolicy
retryPolicy) {
if (!EXPONENTIAL_BACKOFF.equals(retryPolicy.getStrategyCase())) {
throw new IllegalArgumentException("strategy must be exponential
backoff");
}
- return updateBackoff(retryPolicy.getExponentialBackoff());
+ return inheritBackoff(retryPolicy.getExponentialBackoff());
}
- private RetryPolicy updateBackoff(ExponentialBackoff backoff) {
+ private RetryPolicy inheritBackoff(ExponentialBackoff backoff) {
return new ExponentialBackoffRetryPolicy(maxAttempts,
Duration.ofNanos(Durations.toNanos(backoff.getInitial())),
Duration.ofNanos(Durations.toNanos(backoff.getMax())),
diff --git
a/java/client/src/main/java/org/apache/rocketmq/client/java/retry/RetryPolicy.java
b/java/client/src/main/java/org/apache/rocketmq/client/java/retry/RetryPolicy.java
index 43d9fa2..e6ed336 100644
---
a/java/client/src/main/java/org/apache/rocketmq/client/java/retry/RetryPolicy.java
+++
b/java/client/src/main/java/org/apache/rocketmq/client/java/retry/RetryPolicy.java
@@ -44,7 +44,10 @@ public interface RetryPolicy {
* @param retryPolicy retry policy which contains the backoff strategy.
* @return the new retry policy.
*/
- RetryPolicy updateBackoff(apache.rocketmq.v2.RetryPolicy retryPolicy);
+ RetryPolicy inheritBackoff(apache.rocketmq.v2.RetryPolicy retryPolicy);
+ /**
+ * Convert to {@link apache.rocketmq.v2.RetryPolicy}.
+ */
apache.rocketmq.v2.RetryPolicy toProtobuf();
}
diff --git
a/java/client/src/test/java/org/apache/rocketmq/client/java/impl/producer/ProducerImplTest.java
b/java/client/src/test/java/org/apache/rocketmq/client/java/impl/producer/ProducerImplTest.java
index 8b4985c..ef94469 100644
---
a/java/client/src/test/java/org/apache/rocketmq/client/java/impl/producer/ProducerImplTest.java
+++
b/java/client/src/test/java/org/apache/rocketmq/client/java/impl/producer/ProducerImplTest.java
@@ -17,186 +17,98 @@
package org.apache.rocketmq.client.java.impl.producer;
-import static org.junit.Assert.assertEquals;
import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.Mockito.doNothing;
+import static org.mockito.ArgumentMatchers.anyList;
import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.never;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
import apache.rocketmq.v2.Broker;
-import apache.rocketmq.v2.Code;
import apache.rocketmq.v2.MessageQueue;
+import apache.rocketmq.v2.MessageType;
import apache.rocketmq.v2.Permission;
-import apache.rocketmq.v2.Publishing;
-import apache.rocketmq.v2.QueryRouteRequest;
-import apache.rocketmq.v2.QueryRouteResponse;
import apache.rocketmq.v2.Resource;
-import apache.rocketmq.v2.SendMessageRequest;
-import apache.rocketmq.v2.SendMessageResponse;
-import apache.rocketmq.v2.Settings;
-import apache.rocketmq.v2.Status;
-import apache.rocketmq.v2.TelemetryCommand;
-import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.Service;
-import com.google.common.util.concurrent.SettableFuture;
import io.grpc.Metadata;
-import io.grpc.stub.StreamObserver;
-import java.time.Duration;
import java.util.ArrayList;
-import java.util.Arrays;
+import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ScheduledThreadPoolExecutor;
import org.apache.rocketmq.client.apis.ClientConfiguration;
import org.apache.rocketmq.client.apis.ClientException;
import org.apache.rocketmq.client.apis.message.Message;
-import org.apache.rocketmq.client.apis.producer.SendReceipt;
-import org.apache.rocketmq.client.java.impl.ClientManagerImpl;
-import org.apache.rocketmq.client.java.impl.ClientSessionImpl;
-import org.apache.rocketmq.client.java.misc.ThreadFactoryImpl;
import org.apache.rocketmq.client.java.route.Endpoints;
-import org.apache.rocketmq.client.java.rpc.RpcInvocation;
+import org.apache.rocketmq.client.java.route.MessageQueueImpl;
+import org.apache.rocketmq.client.java.route.TopicRouteData;
import org.apache.rocketmq.client.java.tool.TestBase;
-import org.junit.Ignore;
import org.junit.Test;
import org.junit.runner.RunWith;
-import org.mockito.InjectMocks;
-import org.mockito.Mock;
+import org.mockito.Mockito;
import org.mockito.junit.MockitoJUnitRunner;
@RunWith(MockitoJUnitRunner.class)
public class ProducerImplTest extends TestBase {
- @Mock
- private ClientManagerImpl clientManager;
- @Mock
- private StreamObserver<TelemetryCommand> telemetryRequestObserver;
-
private final ClientConfiguration clientConfiguration =
ClientConfiguration.newBuilder()
.setEndpoints(FAKE_ACCESS_POINT).build();
- private final String[] str = {FAKE_TOPIC_0};
- private final Set<String> set = new HashSet<>(Arrays.asList(str));
-
- @InjectMocks
- private final ProducerImpl producer = new
ProducerImpl(clientConfiguration, set, 1, null);
-
- @InjectMocks
- private final ProducerImpl producerWithoutTopicBinding = new
ProducerImpl(clientConfiguration, new HashSet<>(), 1,
- null);
-
- private void start(ProducerImpl producer) throws ClientException {
- SettableFuture<RpcInvocation<QueryRouteResponse>> future0 =
SettableFuture.create();
- Status status = Status.newBuilder().setCode(Code.OK).build();
+ @SuppressWarnings("SameParameterValue")
+ private ProducerImpl createProducerWithTopic(String topic) {
List<MessageQueue> messageQueueList = new ArrayList<>();
- MessageQueue mq =
MessageQueue.newBuilder().setTopic(Resource.newBuilder().setName(FAKE_TOPIC_0))
+ MessageQueue mq =
MessageQueue.newBuilder().setTopic(Resource.newBuilder().setName(topic))
.setPermission(Permission.READ_WRITE)
+ .addAcceptMessageTypes(MessageType.NORMAL)
.setBroker(Broker.newBuilder().setName(FAKE_BROKER_NAME_0).setEndpoints(fakePbEndpoints0()))
.setId(0).build();
messageQueueList.add(mq);
- QueryRouteResponse response =
QueryRouteResponse.newBuilder().setStatus(status)
- .addAllMessageQueues(messageQueueList).build();
- final RpcInvocation<QueryRouteResponse> rpcInvocation =
- new RpcInvocation<>(response, fakeRpcContext());
- future0.set(rpcInvocation);
- when(clientManager.queryRoute(any(Endpoints.class),
any(Metadata.class), any(QueryRouteRequest.class),
- any(Duration.class)))
- .thenReturn(future0);
- when(clientManager.telemetry(any(Endpoints.class),
any(Metadata.class), any(Duration.class),
- any(ClientSessionImpl.class)))
- .thenReturn(telemetryRequestObserver);
- final ScheduledThreadPoolExecutor scheduler = new
ScheduledThreadPoolExecutor(1, new ThreadFactoryImpl(
- "TestScheduler"));
- when(clientManager.getScheduler()).thenReturn(scheduler);
-
doNothing().when(telemetryRequestObserver).onNext(any(TelemetryCommand.class));
-
- int messageMaxBodySize = 1024 * 1024 * 4;
- Publishing publishing =
Publishing.newBuilder().setMaxBodySize(messageMaxBodySize).build();
- Settings settings =
Settings.newBuilder().setPublishing(publishing).build();
- final Service service = producer.startAsync();
- producer.getClientSettings().applySettingsCommand(settings);
- service.awaitRunning();
- }
-
- private void shutdown(ProducerImpl producer) {
- final Service clientManagerService = mock(Service.class);
- when(clientManager.stopAsync()).thenReturn(clientManagerService);
- doNothing().when(clientManagerService).awaitTerminated();
- producer.stopAsync().awaitTerminated();
+ final TopicRouteData topicRouteData = new
TopicRouteData(messageQueueList);
+ final PublishingLoadBalancer publishingLoadBalancer = new
PublishingLoadBalancer(topicRouteData);
+ final Set<String> set = new HashSet<>();
+ set.add(topic);
+ final ProducerImpl producer = Mockito.spy(new
ProducerImpl(clientConfiguration, set, 1, null));
+ producer.publishingRouteDataCache.put(topic, publishingLoadBalancer);
+ final Service mockedService = mock(Service.class);
+ Mockito.doReturn(mockedService).when(producer).startAsync();
+ Mockito.doReturn(mockedService).when(producer).stopAsync();
+ Mockito.doReturn(true).when(producer).isRunning();
+ producer.startAsync().awaitRunning();
+ return producer;
}
@Test(expected = IllegalStateException.class)
- public void testSendWithoutStart() throws ClientException {
+ public void testSendBeforeStartup() throws ClientException {
+ final Set<String> set = Collections.singleton(FAKE_TOPIC_0);
+ final ProducerImpl producer = Mockito.spy(new
ProducerImpl(clientConfiguration, set, 1, null));
final Message message = fakeMessage(FAKE_TOPIC_0);
producer.send(message);
}
@Test
- @Ignore
- public void testSendWithTopicBinding() throws ClientException,
ExecutionException, InterruptedException {
- start(producer);
- verify(clientManager, times(1)).queryRoute(any(Endpoints.class),
any(Metadata.class),
- any(QueryRouteRequest.class), any(Duration.class));
- verify(clientManager, times(1)).telemetry(any(Endpoints.class),
any(Metadata.class),
- any(Duration.class), any(ClientSessionImpl.class));
+ public void testSendWithTopic() throws Exception {
+ final ProducerImpl producer = createProducerWithTopic(FAKE_TOPIC_0);
final Message message = fakeMessage(FAKE_TOPIC_0);
- final ListenableFuture<RpcInvocation<SendMessageResponse>> future =
- okSendMessageResponseFutureWithSingleEntry();
- when(clientManager.sendMessage(any(Endpoints.class),
any(Metadata.class), any(SendMessageRequest.class),
- any(Duration.class))).thenReturn(future);
- final SendMessageResponse response = future.get().getResponse();
- assertEquals(1, response.getEntriesCount());
- final apache.rocketmq.v2.SendResultEntry receipt =
response.getEntriesList().iterator().next();
- final SendReceipt sendReceipt = producer.send(message);
- assertEquals(receipt.getMessageId(),
sendReceipt.getMessageId().toString());
- shutdown(producer);
+ final MessageQueueImpl messageQueue =
fakeMessageQueueImpl(FAKE_TOPIC_0);
+ final SendReceiptImpl sendReceiptImpl =
fakeSendReceiptImpl(messageQueue);
+
Mockito.doReturn(Futures.immediateFuture(Collections.singletonList(sendReceiptImpl)))
+ .when(producer).send0(any(Metadata.class), any(Endpoints.class),
anyList(), any(MessageQueueImpl.class));
+ producer.send(message);
+ verify(producer, times(1)).send0(any(Metadata.class),
any(Endpoints.class), anyList(),
+ any(MessageQueueImpl.class));
+ producer.close();
}
- @Test
- @Ignore
- public void testSendWithoutTopicBinding() throws ClientException,
ExecutionException, InterruptedException {
- start(producerWithoutTopicBinding);
- verify(clientManager, never()).queryRoute(any(Endpoints.class),
any(Metadata.class),
- any(QueryRouteRequest.class), any(Duration.class));
- verify(clientManager, never()).telemetry(any(Endpoints.class),
any(Metadata.class), any(Duration.class),
- any(ClientSessionImpl.class));
+ @Test(expected = IllegalArgumentException.class)
+ public void testSendFailureWithTopic() throws ClientException {
+ final ProducerImpl producer = createProducerWithTopic(FAKE_TOPIC_0);
final Message message = fakeMessage(FAKE_TOPIC_0);
- final ListenableFuture<RpcInvocation<SendMessageResponse>> future =
- okSendMessageResponseFutureWithSingleEntry();
- when(clientManager.sendMessage(any(Endpoints.class),
any(Metadata.class), any(SendMessageRequest.class),
- any(Duration.class))).thenReturn(future);
- final SendMessageResponse response = future.get().getResponse();
- assertEquals(1, response.getEntriesCount());
- final SendReceipt sendReceipt =
producerWithoutTopicBinding.send(message);
- verify(clientManager, times(1)).queryRoute(any(Endpoints.class),
any(Metadata.class),
- any(QueryRouteRequest.class), any(Duration.class));
- verify(clientManager, times(1)).telemetry(any(Endpoints.class),
any(Metadata.class),
- any(Duration.class), any(ClientSessionImpl.class));
- final apache.rocketmq.v2.SendResultEntry receipt =
response.getEntriesList().iterator().next();
- assertEquals(receipt.getMessageId(),
sendReceipt.getMessageId().toString());
- shutdown(producerWithoutTopicBinding);
- }
-
- @Test(expected = ClientException.class)
- @Ignore
- public void testSendMessageWithFailure() throws ClientException {
- start(producer);
- verify(clientManager, times(1)).queryRoute(any(Endpoints.class),
any(Metadata.class),
- any(QueryRouteRequest.class), any(Duration.class));
- verify(clientManager, times(1)).telemetry(any(Endpoints.class),
any(Metadata.class), any(Duration.class),
- any(ClientSessionImpl.class));
- final ListenableFuture<RpcInvocation<SendMessageResponse>> future =
failureSendMessageResponseFuture();
- when(clientManager.sendMessage(any(Endpoints.class),
any(Metadata.class), any(SendMessageRequest.class),
- any(Duration.class))).thenReturn(future);
- Message message0 = fakeMessage(FAKE_TOPIC_0);
- try {
- producer.send(message0);
- } finally {
- shutdown(producer);
- }
+ final Exception exception = new IllegalArgumentException();
+ Mockito.doReturn(Futures.immediateFailedFuture(exception))
+ .when(producer).send0(any(Metadata.class), any(Endpoints.class),
anyList(), any(MessageQueueImpl.class));
+ producer.send(message);
+ final int maxAttempts =
producer.producerSettings.getRetryPolicy().getMaxAttempts();
+ verify(producer, times(maxAttempts)).send0(any(Metadata.class),
any(Endpoints.class), anyList(),
+ any(MessageQueueImpl.class));
+ producer.close();
}
}
\ No newline at end of file
diff --git
a/java/client/src/test/java/org/apache/rocketmq/client/java/retry/CustomizedBackoffRetryPolicyTest.java
b/java/client/src/test/java/org/apache/rocketmq/client/java/retry/CustomizedBackoffRetryPolicyTest.java
index 0faaf23..d5a92d4 100644
---
a/java/client/src/test/java/org/apache/rocketmq/client/java/retry/CustomizedBackoffRetryPolicyTest.java
+++
b/java/client/src/test/java/org/apache/rocketmq/client/java/retry/CustomizedBackoffRetryPolicyTest.java
@@ -21,11 +21,13 @@ import static
apache.rocketmq.v2.RetryPolicy.StrategyCase.CUSTOMIZED_BACKOFF;
import static org.junit.Assert.assertEquals;
import apache.rocketmq.v2.CustomizedBackoff;
-import apache.rocketmq.v2.RetryPolicy;
+import apache.rocketmq.v2.ExponentialBackoff;
import com.google.protobuf.util.Durations;
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
+import java.util.stream.Collectors;
+import org.junit.Assert;
import org.junit.Test;
public class CustomizedBackoffRetryPolicyTest {
@@ -46,6 +48,51 @@ public class CustomizedBackoffRetryPolicyTest {
assertEquals(duration1, policy.getNextAttemptDelay(4));
}
+ @Test(expected = IllegalArgumentException.class)
+ public void testGetNextAttemptDelayWithIllegalAttempt() {
+ int maxAttempt = 3;
+ Duration duration0 = Duration.ofSeconds(1);
+ Duration duration1 = Duration.ofSeconds(2);
+ List<Duration> durations = new ArrayList<>();
+ durations.add(duration0);
+ durations.add(duration1);
+ final CustomizedBackoffRetryPolicy retryPolicy = new
CustomizedBackoffRetryPolicy(durations, maxAttempt);
+ retryPolicy.getNextAttemptDelay(0);
+ }
+
+ @Test
+ public void testFromProtobuf() {
+ com.google.protobuf.Duration duration0 = Durations.fromSeconds(1);
+ com.google.protobuf.Duration duration1 = Durations.fromSeconds(2);
+ com.google.protobuf.Duration duration2 = Durations.fromSeconds(3);
+ List<com.google.protobuf.Duration> durations = new ArrayList<>();
+ durations.add(duration0);
+ durations.add(duration1);
+ durations.add(duration2);
+ CustomizedBackoff customizedBackoff =
CustomizedBackoff.newBuilder().addAllNext(durations).build();
+ apache.rocketmq.v2.RetryPolicy retryPolicy =
+
apache.rocketmq.v2.RetryPolicy.newBuilder().setCustomizedBackoff(customizedBackoff).setMaxAttempts(3)
+ .build();
+ CustomizedBackoffRetryPolicy retryPolicy0 =
CustomizedBackoffRetryPolicy.fromProtobuf(retryPolicy);
+ final List<Duration> durations0 = durations.stream()
+ .map(duration ->
Duration.ofNanos(Durations.toNanos(duration))).collect(Collectors.toList());
+ Assert.assertEquals(retryPolicy0.getDurations(), durations0);
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void testFromProtobufWithoutCustomizedBackoff() {
+ com.google.protobuf.Duration initialBackoff0 =
Durations.fromSeconds(1);
+ com.google.protobuf.Duration maxBackoff0 = Durations.fromSeconds(1);
+ float backoffMultiplier = 1.0f;
+ int maxAttempts = 3;
+ ExponentialBackoff exponentialBackoff = ExponentialBackoff.newBuilder()
+
.setInitial(initialBackoff0).setMax(maxBackoff0).setMultiplier(backoffMultiplier).build();
+ apache.rocketmq.v2.RetryPolicy retryPolicy =
apache.rocketmq.v2.RetryPolicy.newBuilder()
+ .setMaxAttempts(maxAttempts)
+ .setExponentialBackoff(exponentialBackoff).build();
+ CustomizedBackoffRetryPolicy.fromProtobuf(retryPolicy);
+ }
+
@Test
public void testToProtobuf() {
int maxAttempt = 3;
@@ -55,7 +102,7 @@ public class CustomizedBackoffRetryPolicyTest {
durations.add(duration0);
durations.add(duration1);
final CustomizedBackoffRetryPolicy policy = new
CustomizedBackoffRetryPolicy(durations, maxAttempt);
- final RetryPolicy protobuf = policy.toProtobuf();
+ final apache.rocketmq.v2.RetryPolicy protobuf = policy.toProtobuf();
assertEquals(maxAttempt, protobuf.getMaxAttempts());
assertEquals(CUSTOMIZED_BACKOFF, protobuf.getStrategyCase());
final CustomizedBackoff backoff = protobuf.getCustomizedBackoff();
@@ -65,4 +112,54 @@ public class CustomizedBackoffRetryPolicyTest {
assertEquals(durations.get(i).toNanos(),
Durations.toNanos(next.get(i)));
}
}
+
+ @Test
+ public void testInheritBackoff() {
+ List<com.google.protobuf.Duration> durations0 = new ArrayList<>();
+ com.google.protobuf.Duration duration0 = Durations.fromSeconds(1);
+ com.google.protobuf.Duration duration1 = Durations.fromSeconds(2);
+ com.google.protobuf.Duration duration2 = Durations.fromSeconds(3);
+ durations0.add(duration0);
+ durations0.add(duration1);
+ durations0.add(duration2);
+ int maxAttempt0 = 5;
+ CustomizedBackoff customizedBackoff =
CustomizedBackoff.newBuilder().addAllNext(durations0).build();
+ apache.rocketmq.v2.RetryPolicy retryPolicy =
apache.rocketmq.v2.RetryPolicy.newBuilder()
+ .setCustomizedBackoff(customizedBackoff)
+ .setMaxAttempts(maxAttempt0)
+ .build();
+ int maxAttempt = 3;
+ Duration duration3 = Duration.ofSeconds(3);
+ Duration duration4 = Duration.ofSeconds(2);
+ Duration duration5 = Duration.ofSeconds(1);
+ List<Duration> durations1 = new ArrayList<>();
+ durations1.add(duration3);
+ durations1.add(duration4);
+ durations1.add(duration5);
+ final CustomizedBackoffRetryPolicy retryPolicy0 = new
CustomizedBackoffRetryPolicy(durations1, maxAttempt);
+ final RetryPolicy retryPolicy1 =
retryPolicy0.inheritBackoff(retryPolicy);
+ Assert.assertTrue(retryPolicy1 instanceof
CustomizedBackoffRetryPolicy);
+ CustomizedBackoffRetryPolicy customizedBackoffRetryPolicy =
(CustomizedBackoffRetryPolicy) retryPolicy1;
+ final List<Duration> durations2 = durations0.stream()
+ .map(duration ->
Duration.ofNanos(Durations.toNanos(duration))).collect(Collectors.toList());
+ Assert.assertEquals(customizedBackoffRetryPolicy.getDurations(),
durations2);
+ Assert.assertEquals(customizedBackoffRetryPolicy.getMaxAttempts(),
maxAttempt);
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void testInheritBackoffWithoutCustomizedBackoff() {
+ int maxAttempt = 3;
+ Duration duration3 = Duration.ofSeconds(3);
+ Duration duration4 = Duration.ofSeconds(2);
+ Duration duration5 = Duration.ofSeconds(1);
+ List<Duration> durations1 = new ArrayList<>();
+ durations1.add(duration3);
+ durations1.add(duration4);
+ durations1.add(duration5);
+ final CustomizedBackoffRetryPolicy retryPolicy0 = new
CustomizedBackoffRetryPolicy(durations1, maxAttempt);
+ ExponentialBackoff exponentialBackoff =
ExponentialBackoff.newBuilder().build();
+ apache.rocketmq.v2.RetryPolicy retryPolicy =
apache.rocketmq.v2.RetryPolicy.newBuilder()
+ .setExponentialBackoff(exponentialBackoff).build();
+ retryPolicy0.inheritBackoff(retryPolicy);
+ }
}
\ No newline at end of file
diff --git
a/java/client/src/test/java/org/apache/rocketmq/client/java/retry/ExponentialBackoffRetryPolicyTest.java
b/java/client/src/test/java/org/apache/rocketmq/client/java/retry/ExponentialBackoffRetryPolicyTest.java
new file mode 100644
index 0000000..c8b2e5f
--- /dev/null
+++
b/java/client/src/test/java/org/apache/rocketmq/client/java/retry/ExponentialBackoffRetryPolicyTest.java
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.client.java.retry;
+
+import apache.rocketmq.v2.CustomizedBackoff;
+import apache.rocketmq.v2.ExponentialBackoff;
+import com.google.protobuf.util.Durations;
+import java.time.Duration;
+import org.apache.rocketmq.client.java.tool.TestBase;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class ExponentialBackoffRetryPolicyTest extends TestBase {
+
+ @Test
+ public void testNextAttemptDelayForImmediatelyRetryPolicy() {
+ final ExponentialBackoffRetryPolicy retryPolicy =
ExponentialBackoffRetryPolicy.immediatelyRetryPolicy(3);
+ Assert.assertEquals(retryPolicy.getNextAttemptDelay(1), Duration.ZERO);
+ Assert.assertEquals(retryPolicy.getNextAttemptDelay(2), Duration.ZERO);
+ Assert.assertEquals(retryPolicy.getNextAttemptDelay(3), Duration.ZERO);
+ Assert.assertEquals(retryPolicy.getNextAttemptDelay(4), Duration.ZERO);
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void testGetNextAttemptDelayWithIllegalAttempt() {
+ Duration initialBackoff = Duration.ofMillis(5);
+ Duration maxBackoff = Duration.ofSeconds(1);
+ double backoffMultiplier = 5;
+ final ExponentialBackoffRetryPolicy retryPolicy = new
ExponentialBackoffRetryPolicy(3,
+ initialBackoff, maxBackoff, backoffMultiplier);
+ retryPolicy.getNextAttemptDelay(0);
+ }
+
+ @Test
+ public void testGetNextAttemptDelay() {
+ Duration initialBackoff = Duration.ofMillis(5);
+ Duration maxBackoff = Duration.ofSeconds(1);
+ double backoffMultiplier = 5;
+ final ExponentialBackoffRetryPolicy retryPolicy = new
ExponentialBackoffRetryPolicy(3,
+ initialBackoff, maxBackoff, backoffMultiplier);
+ Assert.assertEquals(retryPolicy.getNextAttemptDelay(1),
Duration.ofMillis(5));
+ Assert.assertEquals(retryPolicy.getNextAttemptDelay(2),
Duration.ofMillis(25));
+ Assert.assertEquals(retryPolicy.getNextAttemptDelay(3),
Duration.ofMillis(125));
+ Assert.assertEquals(retryPolicy.getNextAttemptDelay(4),
Duration.ofMillis(625));
+ Assert.assertEquals(retryPolicy.getNextAttemptDelay(5),
Duration.ofSeconds(1));
+ }
+
+ @Test
+ public void testFromProtobuf() {
+ final Duration initialBackoff = Duration.ofMillis(5);
+ final Duration maxBackoff = Duration.ofSeconds(1);
+ com.google.protobuf.Duration initialBackoff0 =
Durations.fromNanos(initialBackoff.toNanos());
+ com.google.protobuf.Duration maxBackoff0 =
Durations.fromNanos(maxBackoff.toNanos());
+ float backoffMultiplier = 5;
+ int maxAttempts = 3;
+ ExponentialBackoff exponentialBackoff = ExponentialBackoff.newBuilder()
+
.setInitial(initialBackoff0).setMax(maxBackoff0).setMultiplier(backoffMultiplier).build();
+ apache.rocketmq.v2.RetryPolicy retryPolicy =
apache.rocketmq.v2.RetryPolicy.newBuilder()
+ .setMaxAttempts(maxAttempts)
+ .setExponentialBackoff(exponentialBackoff).build();
+ ExponentialBackoffRetryPolicy exponentialBackoffRetryPolicy =
+ ExponentialBackoffRetryPolicy.fromProtobuf(retryPolicy);
+ Assert.assertEquals(exponentialBackoffRetryPolicy.getMaxAttempts(),
maxAttempts);
+ Assert.assertEquals(exponentialBackoffRetryPolicy.getInitialBackoff(),
initialBackoff);
+ Assert.assertEquals(exponentialBackoffRetryPolicy.getMaxBackoff(),
maxBackoff);
+
Assert.assertEquals(exponentialBackoffRetryPolicy.getBackoffMultiplier(),
backoffMultiplier, 0);
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void testFromProtobufWithoutExponentialBackoff() {
+ int maxAttempts = 3;
+ CustomizedBackoff customizedBackoff =
CustomizedBackoff.newBuilder().addNext(Durations.fromSeconds(1)).build();
+ apache.rocketmq.v2.RetryPolicy retryPolicy =
apache.rocketmq.v2.RetryPolicy.newBuilder()
+
.setMaxAttempts(maxAttempts).setCustomizedBackoff(customizedBackoff).build();
+ ExponentialBackoffRetryPolicy.fromProtobuf(retryPolicy);
+ }
+
+ @Test
+ public void testToProtobuf() {
+ Duration initialBackoff = Duration.ofMillis(5);
+ Duration maxBackoff = Duration.ofSeconds(1);
+ double backoffMultiplier = 5;
+ int maxAttempts = 3;
+ final ExponentialBackoffRetryPolicy retryPolicy = new
ExponentialBackoffRetryPolicy(maxAttempts,
+ initialBackoff, maxBackoff, backoffMultiplier);
+ final apache.rocketmq.v2.RetryPolicy retryPolicy0 =
retryPolicy.toProtobuf();
+ Assert.assertTrue(retryPolicy0.hasExponentialBackoff());
+ final ExponentialBackoff exponentialBackoff =
retryPolicy0.getExponentialBackoff();
+ com.google.protobuf.Duration initialBackoff0 =
Durations.fromNanos(initialBackoff.toNanos());
+ com.google.protobuf.Duration maxBackoff0 =
Durations.fromNanos(maxBackoff.toNanos());
+ Assert.assertEquals(exponentialBackoff.getInitial(), initialBackoff0);
+ Assert.assertEquals(exponentialBackoff.getMax(), maxBackoff0);
+ Assert.assertEquals(exponentialBackoff.getMultiplier(),
backoffMultiplier, 0);
+ Assert.assertEquals(retryPolicy0.getMaxAttempts(), maxAttempts);
+ }
+
+ @Test
+ public void testInheritBackoff() {
+ Duration initialBackoff = Duration.ofMillis(5);
+ Duration maxBackoff = Duration.ofSeconds(1);
+ double backoffMultiplier = 5;
+ int maxAttempts = 3;
+ final ExponentialBackoffRetryPolicy retryPolicy = new
ExponentialBackoffRetryPolicy(maxAttempts,
+ initialBackoff, maxBackoff, backoffMultiplier);
+
+ Duration initialBackoff0 = Duration.ofMillis(10);
+ Duration maxBackoff0 = Duration.ofSeconds(3);
+ double backoffMultiplier0 = 10;
+ ExponentialBackoff exponentialBackoff = ExponentialBackoff.newBuilder()
+ .setInitial(Durations.fromNanos(initialBackoff0.toNanos()))
+
.setMax(Durations.fromNanos(maxBackoff0.toNanos())).setMultiplier((float)
backoffMultiplier0).build();
+ apache.rocketmq.v2.RetryPolicy retryPolicy0 =
apache.rocketmq.v2.RetryPolicy.newBuilder()
+ .setExponentialBackoff(exponentialBackoff).build();
+ final RetryPolicy retryPolicy1 =
retryPolicy.inheritBackoff(retryPolicy0);
+ Assert.assertTrue(retryPolicy1 instanceof
ExponentialBackoffRetryPolicy);
+ ExponentialBackoffRetryPolicy exponentialBackoffRetryPolicy =
(ExponentialBackoffRetryPolicy) retryPolicy1;
+ Assert.assertEquals(exponentialBackoffRetryPolicy.getInitialBackoff(),
initialBackoff0);
+ Assert.assertEquals(exponentialBackoffRetryPolicy.getMaxBackoff(),
maxBackoff0);
+
Assert.assertEquals(exponentialBackoffRetryPolicy.getBackoffMultiplier(),
backoffMultiplier0, 0);
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void testInheritBackoffWithoutExponentialBackoff() {
+ int maxAttempts = 3;
+ CustomizedBackoff customizedBackoff =
CustomizedBackoff.newBuilder().addNext(Durations.fromSeconds(1)).build();
+ apache.rocketmq.v2.RetryPolicy retryPolicy =
apache.rocketmq.v2.RetryPolicy.newBuilder()
+
.setMaxAttempts(maxAttempts).setCustomizedBackoff(customizedBackoff).build();
+
+ Duration initialBackoff = Duration.ofMillis(5);
+ Duration maxBackoff = Duration.ofSeconds(1);
+ double backoffMultiplier = 5;
+ final ExponentialBackoffRetryPolicy exponentialBackoffRetryPolicy =
+ new ExponentialBackoffRetryPolicy(maxAttempts,
+ initialBackoff, maxBackoff, backoffMultiplier);
+ exponentialBackoffRetryPolicy.inheritBackoff(retryPolicy);
+ }
+}
\ No newline at end of file
diff --git
a/java/client/src/test/java/org/apache/rocketmq/client/java/tool/TestBase.java
b/java/client/src/test/java/org/apache/rocketmq/client/java/tool/TestBase.java
index b7de438..cd4cb85 100644
---
a/java/client/src/test/java/org/apache/rocketmq/client/java/tool/TestBase.java
+++
b/java/client/src/test/java/org/apache/rocketmq/client/java/tool/TestBase.java
@@ -178,6 +178,10 @@ public class TestBase {
keys, properties, FAKE_HOST_0, 1, 1, mq, FAKE_RECEIPT_HANDLE_0,
null, 1, corrupted, null);
}
+ protected MessageQueueImpl fakeMessageQueueImpl(String topic) {
+ return new
MessageQueueImpl(fakePbMessageQueue0(Resource.newBuilder().setName(topic).build()));
+ }
+
protected MessageQueueImpl fakeMessageQueueImpl0() {
return new MessageQueueImpl(fakePbMessageQueue0());
}
@@ -363,7 +367,7 @@ public class TestBase {
MessageQueueImpl mq) throws ExecutionException, InterruptedException,
ClientException {
final ListenableFuture<RpcInvocation<SendMessageResponse>> future =
okSendMessageResponseFutureWithSingleEntry();
- final List<SendReceiptImpl> receipts =
SendReceiptImpl.processSendMessageResponseInvocation(mq, future.get());
+ final List<SendReceiptImpl> receipts =
SendReceiptImpl.processResponseInvocation(mq, future.get());
return receipts.iterator().next();
}
}
diff --git a/java/pom.xml b/java/pom.xml
index 89b1120..64168d0 100644
--- a/java/pom.xml
+++ b/java/pom.xml
@@ -290,6 +290,9 @@
</execution>
</executions>
<configuration>
+ <excludes>
+ <exclude>**/example/**/*.class</exclude>
+ </excludes>
<rules>
<rule>
<element>CLASS</element>