This is an automated email from the ASF dual-hosted git repository.

jinrongtong pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new 1511809975 [ISSUE #8300] Add more test coverage for DefaultMQProducer 
(#8301)
1511809975 is described below

commit 1511809975a3e4f724286fa6e74090ad28598c03
Author: yx9o <[email protected]>
AuthorDate: Tue Jun 18 09:53:35 2024 +0800

    [ISSUE #8300] Add more test coverage for DefaultMQProducer (#8301)
    
    * [ISSUE #8300] Add more test coverage for DefaultMQProducer
    
    * Add more tests.
    
    * Update
---
 .../client/producer/DefaultMQProducerTest.java     | 195 +++++++++++++++++++--
 1 file changed, 182 insertions(+), 13 deletions(-)

diff --git 
a/client/src/test/java/org/apache/rocketmq/client/producer/DefaultMQProducerTest.java
 
b/client/src/test/java/org/apache/rocketmq/client/producer/DefaultMQProducerTest.java
index 7e1fad6247..96086c7a25 100644
--- 
a/client/src/test/java/org/apache/rocketmq/client/producer/DefaultMQProducerTest.java
+++ 
b/client/src/test/java/org/apache/rocketmq/client/producer/DefaultMQProducerTest.java
@@ -28,7 +28,9 @@ import org.apache.rocketmq.client.impl.MQClientManager;
 import org.apache.rocketmq.client.impl.factory.MQClientInstance;
 import org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl;
 import org.apache.rocketmq.client.impl.producer.TopicPublishInfo;
+import org.apache.rocketmq.client.latency.MQFaultStrategy;
 import org.apache.rocketmq.common.UtilAll;
+import org.apache.rocketmq.common.compression.CompressionType;
 import org.apache.rocketmq.common.message.Message;
 import org.apache.rocketmq.common.message.MessageExt;
 import org.apache.rocketmq.common.message.MessageQueue;
@@ -49,6 +51,7 @@ import org.mockito.junit.MockitoJUnitRunner;
 
 import java.lang.reflect.Field;
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
@@ -82,15 +85,14 @@ public class DefaultMQProducerTest {
     private MQClientInstance mQClientFactory = 
MQClientManager.getInstance().getOrCreateMQClientInstance(new ClientConfig());
     @Mock
     private MQClientAPIImpl mQClientAPIImpl;
-    @Mock
-    private NettyRemotingClient nettyRemotingClient;
 
     private DefaultMQProducer producer;
     private Message message;
     private Message zeroMsg;
     private Message bigMessage;
-    private String topic = "FooBar";
-    private String producerGroupPrefix = "FooBar_PID";
+    private final String topic = "FooBar";
+    private final String producerGroupPrefix = "FooBar_PID";
+    private final long defaultTimeout = 3000L;
 
     @Before
     public void init() throws Exception {
@@ -196,7 +198,7 @@ public class DefaultMQProducerTest {
                 countDownLatch.countDown();
             }
         });
-        countDownLatch.await(3000L, TimeUnit.MILLISECONDS);
+        countDownLatch.await(defaultTimeout, TimeUnit.MILLISECONDS);
     }
 
     @Test
@@ -240,7 +242,7 @@ public class DefaultMQProducerTest {
         //this message is send success
         producer.send(message, sendCallback, 1000);
 
-        countDownLatch.await(3000L, TimeUnit.MILLISECONDS);
+        countDownLatch.await(defaultTimeout, TimeUnit.MILLISECONDS);
         assertThat(cc.get()).isEqualTo(5);
 
         // off enableBackpressureForAsyncMode
@@ -253,7 +255,7 @@ public class DefaultMQProducerTest {
         //this message is send success
         producer.send(message, sendCallback, 1000);
 
-        countDownLatch.await(3000L, TimeUnit.MILLISECONDS);
+        countDownLatch.await(defaultTimeout, TimeUnit.MILLISECONDS);
         assertThat(cc.get()).isEqualTo(10);
     }
 
@@ -301,7 +303,7 @@ public class DefaultMQProducerTest {
         // this message is send failed
         producer.send(msgs, new MessageQueue(), sendCallback, 1000);
 
-        countDownLatch.await(3000L, TimeUnit.MILLISECONDS);
+        countDownLatch.await(defaultTimeout, TimeUnit.MILLISECONDS);
         assertThat(cc.get()).isEqualTo(1);
 
         // off enableBackpressureForAsyncMode
@@ -312,7 +314,7 @@ public class DefaultMQProducerTest {
         // this message is send failed
         producer.send(msgs, new MessageQueue(), sendCallback, 1000);
 
-        countDownLatch.await(3000L, TimeUnit.MILLISECONDS);
+        countDownLatch.await(defaultTimeout, TimeUnit.MILLISECONDS);
         assertThat(cc.get()).isEqualTo(2);
     }
 
@@ -333,7 +335,7 @@ public class DefaultMQProducerTest {
             public void onException(Throwable e) {
             }
         });
-        countDownLatch.await(3000L, TimeUnit.MILLISECONDS);
+        countDownLatch.await(defaultTimeout, TimeUnit.MILLISECONDS);
     }
 
     @Test
@@ -472,7 +474,7 @@ public class DefaultMQProducerTest {
             future.setSendRequestOk(true);
             future.getRequestCallback().onSuccess(responseMsg);
         }
-        countDownLatch.await(3000L, TimeUnit.MILLISECONDS);
+        countDownLatch.await(defaultTimeout, TimeUnit.MILLISECONDS);
     }
 
     @Test
@@ -509,7 +511,7 @@ public class DefaultMQProducerTest {
                 future.getRequestCallback().onException(e);
             }
         }
-        countDownLatch.await(3000L, TimeUnit.MILLISECONDS);
+        countDownLatch.await(defaultTimeout, TimeUnit.MILLISECONDS);
         assertThat(cc.get()).isEqualTo(1);
     }
 
@@ -533,7 +535,7 @@ public class DefaultMQProducerTest {
             }
         });
 
-        countDownLatch.await(3000L, TimeUnit.MILLISECONDS);
+        countDownLatch.await(defaultTimeout, TimeUnit.MILLISECONDS);
         producer.setAutoBatch(false);
     }
 
@@ -662,4 +664,171 @@ public class DefaultMQProducerTest {
         assertTrue(producer5.isEnableTrace());
         assertEquals("custom_trace_topic", producer5.getTraceTopic());
     }
+
+    @Test
+    public void assertSend() throws MQBrokerException, RemotingException, 
InterruptedException, MQClientException, NoSuchFieldException, 
IllegalAccessException {
+        setDefaultMQProducerImpl();
+        setOtherParam();
+        SendResult send = producer.send(message, defaultTimeout);
+        assertNull(send);
+        Collection<Message> msgs = Collections.singletonList(message);
+        send = producer.send(msgs);
+        assertNull(send);
+        send = producer.send(msgs, defaultTimeout);
+        assertNull(send);
+    }
+
+    @Test
+    public void assertSendOneway() throws RemotingException, 
InterruptedException, MQClientException, NoSuchFieldException, 
IllegalAccessException {
+        setDefaultMQProducerImpl();
+        producer.sendOneway(message);
+        MessageQueue mq = mock(MessageQueue.class);
+        producer.sendOneway(message, mq);
+        MessageQueueSelector selector = mock(MessageQueueSelector.class);
+        producer.sendOneway(message, selector, 1);
+    }
+
+    @Test
+    public void assertSendByQueue() throws MQBrokerException, 
RemotingException, InterruptedException, MQClientException, 
NoSuchFieldException, IllegalAccessException {
+        setDefaultMQProducerImpl();
+        MessageQueue mq = mock(MessageQueue.class);
+        SendResult send = producer.send(message, mq);
+        assertNull(send);
+        send = producer.send(message, mq, defaultTimeout);
+        assertNull(send);
+        Collection<Message> msgs = Collections.singletonList(message);
+        send = producer.send(msgs, mq);
+        assertNull(send);
+        send = producer.send(msgs, mq, defaultTimeout);
+        assertNull(send);
+    }
+
+    @Test
+    public void assertSendByQueueSelector() throws MQBrokerException, 
RemotingException, InterruptedException, MQClientException, 
NoSuchFieldException, IllegalAccessException {
+        setDefaultMQProducerImpl();
+        MessageQueueSelector selector = mock(MessageQueueSelector.class);
+        SendResult send = producer.send(message, selector, 1);
+        assertNull(send);
+        send = producer.send(message, selector, 1, defaultTimeout);
+        assertNull(send);
+    }
+
+    @Test
+    public void assertRequest() throws MQBrokerException, RemotingException, 
InterruptedException, MQClientException, NoSuchFieldException, 
IllegalAccessException, RequestTimeoutException {
+        setDefaultMQProducerImpl();
+        MessageQueueSelector selector = mock(MessageQueueSelector.class);
+        Message replyNsg = producer.request(message, selector, 1, 
defaultTimeout);
+        assertNull(replyNsg);
+        RequestCallback requestCallback = mock(RequestCallback.class);
+        producer.request(message, selector, 1, requestCallback, 
defaultTimeout);
+        MessageQueue mq = mock(MessageQueue.class);
+        producer.request(message, mq, defaultTimeout);
+        producer.request(message, mq, requestCallback, defaultTimeout);
+    }
+
+    @Test(expected = RuntimeException.class)
+    public void assertSendMessageInTransaction() throws MQClientException {
+        TransactionSendResult result = 
producer.sendMessageInTransaction(message, 1);
+        assertNull(result);
+    }
+
+    @Test
+    public void assertSearchOffset() throws MQClientException, 
NoSuchFieldException, IllegalAccessException {
+        setDefaultMQProducerImpl();
+        MessageQueue mq = mock(MessageQueue.class);
+        long result = producer.searchOffset(mq, System.currentTimeMillis());
+        assertEquals(0L, result);
+    }
+
+    @Test
+    public void assertBatchMaxDelayMs() throws NoSuchFieldException, 
IllegalAccessException {
+        setProduceAccumulator(true);
+        assertEquals(0, producer.getBatchMaxDelayMs());
+        setProduceAccumulator(false);
+        assertEquals(10, producer.getBatchMaxDelayMs());
+        producer.batchMaxDelayMs(1000);
+        assertEquals(1000, producer.getBatchMaxDelayMs());
+    }
+
+    @Test
+    public void assertBatchMaxBytes() throws NoSuchFieldException, 
IllegalAccessException {
+        setProduceAccumulator(true);
+        assertEquals(0L, producer.getBatchMaxBytes());
+        setProduceAccumulator(false);
+        assertEquals(32 * 1024L, producer.getBatchMaxBytes());
+        producer.batchMaxBytes(64 * 1024L);
+        assertEquals(64 * 1024L, producer.getBatchMaxBytes());
+    }
+
+    @Test
+    public void assertTotalBatchMaxBytes() throws NoSuchFieldException, 
IllegalAccessException {
+        setProduceAccumulator(true);
+        assertEquals(0L, producer.getTotalBatchMaxBytes());
+    }
+
+    @Test
+    public void assertGetRetryResponseCodes() {
+        assertNotNull(producer.getRetryResponseCodes());
+        assertEquals(7, producer.getRetryResponseCodes().size());
+    }
+
+    @Test
+    public void assertIsSendLatencyFaultEnable() {
+        assertFalse(producer.isSendLatencyFaultEnable());
+    }
+
+    @Test
+    public void assertGetLatencyMax() {
+        assertNotNull(producer.getLatencyMax());
+    }
+
+    @Test
+    public void assertGetNotAvailableDuration() {
+        assertNotNull(producer.getNotAvailableDuration());
+    }
+
+    @Test
+    public void assertIsRetryAnotherBrokerWhenNotStoreOK() {
+        assertFalse(producer.isRetryAnotherBrokerWhenNotStoreOK());
+    }
+
+    private void setOtherParam() {
+        producer.setCreateTopicKey("createTopicKey");
+        producer.setRetryAnotherBrokerWhenNotStoreOK(false);
+        producer.setDefaultTopicQueueNums(6);
+        producer.setRetryTimesWhenSendFailed(1);
+        producer.setSendMessageWithVIPChannel(false);
+        producer.setNotAvailableDuration(new long[1]);
+        producer.setLatencyMax(new long[1]);
+        producer.setSendLatencyFaultEnable(false);
+        producer.setRetryTimesWhenSendAsyncFailed(1);
+        producer.setTopics(Collections.singletonList(topic));
+        producer.setStartDetectorEnable(false);
+        producer.setCompressLevel(5);
+        producer.setCompressType(CompressionType.LZ4);
+        producer.addRetryResponseCode(0);
+        ExecutorService executorService = mock(ExecutorService.class);
+        producer.setAsyncSenderExecutor(executorService);
+    }
+
+    private void setProduceAccumulator(final boolean isDefault) throws 
NoSuchFieldException, IllegalAccessException {
+        ProduceAccumulator accumulator = null;
+        if (!isDefault) {
+            accumulator = new ProduceAccumulator("instanceName");
+        }
+        setField(producer, "produceAccumulator", accumulator);
+    }
+
+    private void setDefaultMQProducerImpl() throws NoSuchFieldException, 
IllegalAccessException {
+        DefaultMQProducerImpl producerImpl = mock(DefaultMQProducerImpl.class);
+        setField(producer, "defaultMQProducerImpl", producerImpl);
+        
when(producerImpl.getMqFaultStrategy()).thenReturn(mock(MQFaultStrategy.class));
+    }
+
+    private void setField(final Object target, final String fieldName, final 
Object newValue) throws NoSuchFieldException, IllegalAccessException {
+        Class<?> clazz = target.getClass();
+        Field field = clazz.getDeclaredField(fieldName);
+        field.setAccessible(true);
+        field.set(target, newValue);
+    }
 }

Reply via email to