This is an automated email from the ASF dual-hosted git repository.
jinrongtong pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new 1511809975 [ISSUE #8300] Add more test coverage for DefaultMQProducer
(#8301)
1511809975 is described below
commit 1511809975a3e4f724286fa6e74090ad28598c03
Author: yx9o <[email protected]>
AuthorDate: Tue Jun 18 09:53:35 2024 +0800
[ISSUE #8300] Add more test coverage for DefaultMQProducer (#8301)
* [ISSUE #8300] Add more test coverage for DefaultMQProducer
* Add more tests.
* Update
---
.../client/producer/DefaultMQProducerTest.java | 195 +++++++++++++++++++--
1 file changed, 182 insertions(+), 13 deletions(-)
diff --git
a/client/src/test/java/org/apache/rocketmq/client/producer/DefaultMQProducerTest.java
b/client/src/test/java/org/apache/rocketmq/client/producer/DefaultMQProducerTest.java
index 7e1fad6247..96086c7a25 100644
---
a/client/src/test/java/org/apache/rocketmq/client/producer/DefaultMQProducerTest.java
+++
b/client/src/test/java/org/apache/rocketmq/client/producer/DefaultMQProducerTest.java
@@ -28,7 +28,9 @@ import org.apache.rocketmq.client.impl.MQClientManager;
import org.apache.rocketmq.client.impl.factory.MQClientInstance;
import org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl;
import org.apache.rocketmq.client.impl.producer.TopicPublishInfo;
+import org.apache.rocketmq.client.latency.MQFaultStrategy;
import org.apache.rocketmq.common.UtilAll;
+import org.apache.rocketmq.common.compression.CompressionType;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageQueue;
@@ -49,6 +51,7 @@ import org.mockito.junit.MockitoJUnitRunner;
import java.lang.reflect.Field;
import java.util.ArrayList;
+import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
@@ -82,15 +85,14 @@ public class DefaultMQProducerTest {
private MQClientInstance mQClientFactory =
MQClientManager.getInstance().getOrCreateMQClientInstance(new ClientConfig());
@Mock
private MQClientAPIImpl mQClientAPIImpl;
- @Mock
- private NettyRemotingClient nettyRemotingClient;
private DefaultMQProducer producer;
private Message message;
private Message zeroMsg;
private Message bigMessage;
- private String topic = "FooBar";
- private String producerGroupPrefix = "FooBar_PID";
+ private final String topic = "FooBar";
+ private final String producerGroupPrefix = "FooBar_PID";
+ private final long defaultTimeout = 3000L;
@Before
public void init() throws Exception {
@@ -196,7 +198,7 @@ public class DefaultMQProducerTest {
countDownLatch.countDown();
}
});
- countDownLatch.await(3000L, TimeUnit.MILLISECONDS);
+ countDownLatch.await(defaultTimeout, TimeUnit.MILLISECONDS);
}
@Test
@@ -240,7 +242,7 @@ public class DefaultMQProducerTest {
//this message is send success
producer.send(message, sendCallback, 1000);
- countDownLatch.await(3000L, TimeUnit.MILLISECONDS);
+ countDownLatch.await(defaultTimeout, TimeUnit.MILLISECONDS);
assertThat(cc.get()).isEqualTo(5);
// off enableBackpressureForAsyncMode
@@ -253,7 +255,7 @@ public class DefaultMQProducerTest {
//this message is send success
producer.send(message, sendCallback, 1000);
- countDownLatch.await(3000L, TimeUnit.MILLISECONDS);
+ countDownLatch.await(defaultTimeout, TimeUnit.MILLISECONDS);
assertThat(cc.get()).isEqualTo(10);
}
@@ -301,7 +303,7 @@ public class DefaultMQProducerTest {
// this message is send failed
producer.send(msgs, new MessageQueue(), sendCallback, 1000);
- countDownLatch.await(3000L, TimeUnit.MILLISECONDS);
+ countDownLatch.await(defaultTimeout, TimeUnit.MILLISECONDS);
assertThat(cc.get()).isEqualTo(1);
// off enableBackpressureForAsyncMode
@@ -312,7 +314,7 @@ public class DefaultMQProducerTest {
// this message is send failed
producer.send(msgs, new MessageQueue(), sendCallback, 1000);
- countDownLatch.await(3000L, TimeUnit.MILLISECONDS);
+ countDownLatch.await(defaultTimeout, TimeUnit.MILLISECONDS);
assertThat(cc.get()).isEqualTo(2);
}
@@ -333,7 +335,7 @@ public class DefaultMQProducerTest {
public void onException(Throwable e) {
}
});
- countDownLatch.await(3000L, TimeUnit.MILLISECONDS);
+ countDownLatch.await(defaultTimeout, TimeUnit.MILLISECONDS);
}
@Test
@@ -472,7 +474,7 @@ public class DefaultMQProducerTest {
future.setSendRequestOk(true);
future.getRequestCallback().onSuccess(responseMsg);
}
- countDownLatch.await(3000L, TimeUnit.MILLISECONDS);
+ countDownLatch.await(defaultTimeout, TimeUnit.MILLISECONDS);
}
@Test
@@ -509,7 +511,7 @@ public class DefaultMQProducerTest {
future.getRequestCallback().onException(e);
}
}
- countDownLatch.await(3000L, TimeUnit.MILLISECONDS);
+ countDownLatch.await(defaultTimeout, TimeUnit.MILLISECONDS);
assertThat(cc.get()).isEqualTo(1);
}
@@ -533,7 +535,7 @@ public class DefaultMQProducerTest {
}
});
- countDownLatch.await(3000L, TimeUnit.MILLISECONDS);
+ countDownLatch.await(defaultTimeout, TimeUnit.MILLISECONDS);
producer.setAutoBatch(false);
}
@@ -662,4 +664,171 @@ public class DefaultMQProducerTest {
assertTrue(producer5.isEnableTrace());
assertEquals("custom_trace_topic", producer5.getTraceTopic());
}
+
+ @Test
+ public void assertSend() throws MQBrokerException, RemotingException,
InterruptedException, MQClientException, NoSuchFieldException,
IllegalAccessException {
+ setDefaultMQProducerImpl();
+ setOtherParam();
+ SendResult send = producer.send(message, defaultTimeout);
+ assertNull(send);
+ Collection<Message> msgs = Collections.singletonList(message);
+ send = producer.send(msgs);
+ assertNull(send);
+ send = producer.send(msgs, defaultTimeout);
+ assertNull(send);
+ }
+
+ @Test
+ public void assertSendOneway() throws RemotingException,
InterruptedException, MQClientException, NoSuchFieldException,
IllegalAccessException {
+ setDefaultMQProducerImpl();
+ producer.sendOneway(message);
+ MessageQueue mq = mock(MessageQueue.class);
+ producer.sendOneway(message, mq);
+ MessageQueueSelector selector = mock(MessageQueueSelector.class);
+ producer.sendOneway(message, selector, 1);
+ }
+
+ @Test
+ public void assertSendByQueue() throws MQBrokerException,
RemotingException, InterruptedException, MQClientException,
NoSuchFieldException, IllegalAccessException {
+ setDefaultMQProducerImpl();
+ MessageQueue mq = mock(MessageQueue.class);
+ SendResult send = producer.send(message, mq);
+ assertNull(send);
+ send = producer.send(message, mq, defaultTimeout);
+ assertNull(send);
+ Collection<Message> msgs = Collections.singletonList(message);
+ send = producer.send(msgs, mq);
+ assertNull(send);
+ send = producer.send(msgs, mq, defaultTimeout);
+ assertNull(send);
+ }
+
+ @Test
+ public void assertSendByQueueSelector() throws MQBrokerException,
RemotingException, InterruptedException, MQClientException,
NoSuchFieldException, IllegalAccessException {
+ setDefaultMQProducerImpl();
+ MessageQueueSelector selector = mock(MessageQueueSelector.class);
+ SendResult send = producer.send(message, selector, 1);
+ assertNull(send);
+ send = producer.send(message, selector, 1, defaultTimeout);
+ assertNull(send);
+ }
+
+ @Test
+ public void assertRequest() throws MQBrokerException, RemotingException,
InterruptedException, MQClientException, NoSuchFieldException,
IllegalAccessException, RequestTimeoutException {
+ setDefaultMQProducerImpl();
+ MessageQueueSelector selector = mock(MessageQueueSelector.class);
+ Message replyNsg = producer.request(message, selector, 1,
defaultTimeout);
+ assertNull(replyNsg);
+ RequestCallback requestCallback = mock(RequestCallback.class);
+ producer.request(message, selector, 1, requestCallback,
defaultTimeout);
+ MessageQueue mq = mock(MessageQueue.class);
+ producer.request(message, mq, defaultTimeout);
+ producer.request(message, mq, requestCallback, defaultTimeout);
+ }
+
+ @Test(expected = RuntimeException.class)
+ public void assertSendMessageInTransaction() throws MQClientException {
+ TransactionSendResult result =
producer.sendMessageInTransaction(message, 1);
+ assertNull(result);
+ }
+
+ @Test
+ public void assertSearchOffset() throws MQClientException,
NoSuchFieldException, IllegalAccessException {
+ setDefaultMQProducerImpl();
+ MessageQueue mq = mock(MessageQueue.class);
+ long result = producer.searchOffset(mq, System.currentTimeMillis());
+ assertEquals(0L, result);
+ }
+
+ @Test
+ public void assertBatchMaxDelayMs() throws NoSuchFieldException,
IllegalAccessException {
+ setProduceAccumulator(true);
+ assertEquals(0, producer.getBatchMaxDelayMs());
+ setProduceAccumulator(false);
+ assertEquals(10, producer.getBatchMaxDelayMs());
+ producer.batchMaxDelayMs(1000);
+ assertEquals(1000, producer.getBatchMaxDelayMs());
+ }
+
+ @Test
+ public void assertBatchMaxBytes() throws NoSuchFieldException,
IllegalAccessException {
+ setProduceAccumulator(true);
+ assertEquals(0L, producer.getBatchMaxBytes());
+ setProduceAccumulator(false);
+ assertEquals(32 * 1024L, producer.getBatchMaxBytes());
+ producer.batchMaxBytes(64 * 1024L);
+ assertEquals(64 * 1024L, producer.getBatchMaxBytes());
+ }
+
+ @Test
+ public void assertTotalBatchMaxBytes() throws NoSuchFieldException,
IllegalAccessException {
+ setProduceAccumulator(true);
+ assertEquals(0L, producer.getTotalBatchMaxBytes());
+ }
+
+ @Test
+ public void assertGetRetryResponseCodes() {
+ assertNotNull(producer.getRetryResponseCodes());
+ assertEquals(7, producer.getRetryResponseCodes().size());
+ }
+
+ @Test
+ public void assertIsSendLatencyFaultEnable() {
+ assertFalse(producer.isSendLatencyFaultEnable());
+ }
+
+ @Test
+ public void assertGetLatencyMax() {
+ assertNotNull(producer.getLatencyMax());
+ }
+
+ @Test
+ public void assertGetNotAvailableDuration() {
+ assertNotNull(producer.getNotAvailableDuration());
+ }
+
+ @Test
+ public void assertIsRetryAnotherBrokerWhenNotStoreOK() {
+ assertFalse(producer.isRetryAnotherBrokerWhenNotStoreOK());
+ }
+
+ private void setOtherParam() {
+ producer.setCreateTopicKey("createTopicKey");
+ producer.setRetryAnotherBrokerWhenNotStoreOK(false);
+ producer.setDefaultTopicQueueNums(6);
+ producer.setRetryTimesWhenSendFailed(1);
+ producer.setSendMessageWithVIPChannel(false);
+ producer.setNotAvailableDuration(new long[1]);
+ producer.setLatencyMax(new long[1]);
+ producer.setSendLatencyFaultEnable(false);
+ producer.setRetryTimesWhenSendAsyncFailed(1);
+ producer.setTopics(Collections.singletonList(topic));
+ producer.setStartDetectorEnable(false);
+ producer.setCompressLevel(5);
+ producer.setCompressType(CompressionType.LZ4);
+ producer.addRetryResponseCode(0);
+ ExecutorService executorService = mock(ExecutorService.class);
+ producer.setAsyncSenderExecutor(executorService);
+ }
+
+ private void setProduceAccumulator(final boolean isDefault) throws
NoSuchFieldException, IllegalAccessException {
+ ProduceAccumulator accumulator = null;
+ if (!isDefault) {
+ accumulator = new ProduceAccumulator("instanceName");
+ }
+ setField(producer, "produceAccumulator", accumulator);
+ }
+
+ private void setDefaultMQProducerImpl() throws NoSuchFieldException,
IllegalAccessException {
+ DefaultMQProducerImpl producerImpl = mock(DefaultMQProducerImpl.class);
+ setField(producer, "defaultMQProducerImpl", producerImpl);
+
when(producerImpl.getMqFaultStrategy()).thenReturn(mock(MQFaultStrategy.class));
+ }
+
+ private void setField(final Object target, final String fieldName, final
Object newValue) throws NoSuchFieldException, IllegalAccessException {
+ Class<?> clazz = target.getClass();
+ Field field = clazz.getDeclaredField(fieldName);
+ field.setAccessible(true);
+ field.set(target, newValue);
+ }
}