This is an automated email from the ASF dual-hosted git repository.
yuzhou pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new 017b7537d2 [ISSUE #8227] Optimize DefaultMQPushConsumer construction
method (#8228)
017b7537d2 is described below
commit 017b7537d2d02fc9a5815eac1f19b8060003fcf4
Author: yx9o <[email protected]>
AuthorDate: Thu Jun 13 12:09:11 2024 +0800
[ISSUE #8227] Optimize DefaultMQPushConsumer construction method (#8228)
---
.../client/consumer/DefaultMQPushConsumer.java | 23 ++++-----
.../client/consumer/DefaultMQPushConsumerTest.java | 58 ++++++++++++++++------
2 files changed, 51 insertions(+), 30 deletions(-)
diff --git
a/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumer.java
b/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumer.java
index 312f4632ca..38a412c237 100644
---
a/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumer.java
+++
b/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumer.java
@@ -16,10 +16,6 @@
*/
package org.apache.rocketmq.client.consumer;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Set;
import org.apache.rocketmq.client.ClientConfig;
import org.apache.rocketmq.client.QueryResult;
import org.apache.rocketmq.client.consumer.listener.MessageListener;
@@ -40,12 +36,17 @@ import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.MessageDecoder;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageQueue;
+import org.apache.rocketmq.logging.org.slf4j.Logger;
+import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.remoting.exception.RemotingException;
import org.apache.rocketmq.remoting.protocol.NamespaceUtil;
import org.apache.rocketmq.remoting.protocol.heartbeat.MessageModel;
-import org.apache.rocketmq.logging.org.slf4j.Logger;
-import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
/**
* In most scenarios, this is the mostly recommended class to consume messages.
@@ -328,10 +329,7 @@ public class DefaultMQPushConsumer extends ClientConfig
implements MQPushConsume
* @param rpcHook RPC hook to execute before each remoting command.
*/
public DefaultMQPushConsumer(final String consumerGroup, RPCHook rpcHook) {
- this.consumerGroup = consumerGroup;
- this.rpcHook = rpcHook;
- this.allocateMessageQueueStrategy = new
AllocateMessageQueueAveragely();
- defaultMQPushConsumerImpl = new DefaultMQPushConsumerImpl(this,
rpcHook);
+ this(consumerGroup, rpcHook, new AllocateMessageQueueAveragely());
}
@@ -355,10 +353,7 @@ public class DefaultMQPushConsumer extends ClientConfig
implements MQPushConsume
*/
public DefaultMQPushConsumer(final String consumerGroup, RPCHook rpcHook,
AllocateMessageQueueStrategy allocateMessageQueueStrategy) {
- this.consumerGroup = consumerGroup;
- this.rpcHook = rpcHook;
- this.allocateMessageQueueStrategy = allocateMessageQueueStrategy;
- defaultMQPushConsumerImpl = new DefaultMQPushConsumerImpl(this,
rpcHook);
+ this(consumerGroup, rpcHook, allocateMessageQueueStrategy, false,
null);
}
/**
diff --git
a/client/src/test/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumerTest.java
b/client/src/test/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumerTest.java
index 3943b92289..a10fd74b34 100644
---
a/client/src/test/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumerTest.java
+++
b/client/src/test/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumerTest.java
@@ -16,20 +16,6 @@
*/
package org.apache.rocketmq.client.consumer;
-import java.io.ByteArrayOutputStream;
-import java.lang.reflect.Field;
-import java.net.InetSocketAddress;
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.atomic.AtomicReference;
import org.apache.commons.lang3.reflect.FieldUtils;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
@@ -37,6 +23,8 @@ import
org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyStatus;
import
org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly;
+import
org.apache.rocketmq.client.consumer.rebalance.AllocateMessageQueueAveragely;
+import
org.apache.rocketmq.client.consumer.rebalance.AllocateMessageQueueAveragelyByCircle;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.impl.CommunicationMode;
@@ -53,6 +41,7 @@ import org.apache.rocketmq.client.impl.consumer.PullRequest;
import org.apache.rocketmq.client.impl.consumer.PullResultExt;
import org.apache.rocketmq.client.impl.consumer.RebalanceImpl;
import org.apache.rocketmq.client.impl.factory.MQClientInstance;
+import org.apache.rocketmq.common.UtilAll;
import org.apache.rocketmq.common.message.MessageClientExt;
import org.apache.rocketmq.common.message.MessageDecoder;
import org.apache.rocketmq.common.message.MessageExt;
@@ -62,7 +51,6 @@ import
org.apache.rocketmq.remoting.exception.RemotingException;
import org.apache.rocketmq.remoting.protocol.header.PullMessageRequestHeader;
import org.apache.rocketmq.remoting.protocol.heartbeat.MessageModel;
import org.junit.AfterClass;
-import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
@@ -71,8 +59,27 @@ import org.mockito.invocation.InvocationOnMock;
import org.mockito.junit.MockitoJUnitRunner;
import org.mockito.stubbing.Answer;
+import java.io.ByteArrayOutputStream;
+import java.lang.reflect.Field;
+import java.net.InetSocketAddress;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
+
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Fail.failBecauseExceptionWasNotThrown;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyBoolean;
import static org.mockito.ArgumentMatchers.anyInt;
@@ -80,6 +87,7 @@ import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.ArgumentMatchers.nullable;
import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.when;
@@ -206,7 +214,7 @@ public class DefaultMQPushConsumerTest {
@Test
public void testStart_OffsetShouldNotNUllAfterStart() {
- Assert.assertNotNull(pushConsumer.getOffsetStore());
+ assertNotNull(pushConsumer.getOffsetStore());
}
@Test
@@ -388,4 +396,22 @@ public class DefaultMQPushConsumerTest {
pullMessageService.executePullRequestImmediately(createPullRequest());
assertThat(messageExts[0]).isNull();
}
+
+ @Test
+ public void assertCreatePushConsumer() {
+ DefaultMQPushConsumer pushConsumer1 = new
DefaultMQPushConsumer(consumerGroup, mock(RPCHook.class));
+ assertNotNull(pushConsumer1);
+ assertEquals(consumerGroup, pushConsumer1.getConsumerGroup());
+ assertTrue(pushConsumer1.getAllocateMessageQueueStrategy() instanceof
AllocateMessageQueueAveragely);
+ assertNotNull(pushConsumer1.defaultMQPushConsumerImpl);
+ assertFalse(pushConsumer1.isEnableTrace());
+ assertTrue(UtilAll.isBlank(pushConsumer1.getTraceTopic()));
+ DefaultMQPushConsumer pushConsumer2 = new
DefaultMQPushConsumer(consumerGroup, mock(RPCHook.class), new
AllocateMessageQueueAveragelyByCircle());
+ assertNotNull(pushConsumer2);
+ assertEquals(consumerGroup, pushConsumer2.getConsumerGroup());
+ assertTrue(pushConsumer2.getAllocateMessageQueueStrategy() instanceof
AllocateMessageQueueAveragelyByCircle);
+ assertNotNull(pushConsumer2.defaultMQPushConsumerImpl);
+ assertFalse(pushConsumer2.isEnableTrace());
+ assertTrue(UtilAll.isBlank(pushConsumer2.getTraceTopic()));
+ }
}