duhenglucky commented on a change in pull request #3694:
URL: https://github.com/apache/rocketmq/pull/3694#discussion_r780012318



##########
File path: 
broker/src/test/java/org/apache/rocketmq/broker/BrokerPathConfigHelperTest.java
##########
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.broker;
+
+import org.junit.Test;
+
+import static org.junit.Assert.*;

Review comment:
       Don't use * to replace all imports.

##########
File path: 
test/src/main/java/org/apache/rocketmq/test/lmq/benchmark/BenchLmqStore.java
##########
@@ -0,0 +1,305 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.test.lmq.benchmark;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer;
+import org.apache.rocketmq.client.consumer.PullCallback;
+import org.apache.rocketmq.client.consumer.PullResult;
+import org.apache.rocketmq.client.consumer.PullStatus;
+import org.apache.rocketmq.client.exception.MQBrokerException;
+import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.client.producer.DefaultMQProducer;
+import org.apache.rocketmq.client.producer.SendResult;
+import org.apache.rocketmq.common.MixAll;
+import org.apache.rocketmq.common.ThreadFactoryImpl;
+import org.apache.rocketmq.common.message.Message;
+import org.apache.rocketmq.common.message.MessageExt;
+import org.apache.rocketmq.common.message.MessageQueue;
+import 
org.apache.rocketmq.common.protocol.header.QueryConsumerOffsetRequestHeader;
+import 
org.apache.rocketmq.common.protocol.header.UpdateConsumerOffsetRequestHeader;
+import org.apache.rocketmq.common.protocol.route.TopicRouteData;
+import org.apache.rocketmq.remoting.exception.RemotingException;
+import org.apache.rocketmq.test.util.StatUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+public class BenchLmqStore {
+    private static Logger logger = 
LoggerFactory.getLogger(BenchLmqStore.class);
+    private static String namesrv = System.getProperty("namesrv", 
"127.0.0.1:9876");
+    private static String lmqTopic = System.getProperty("lmqTopic", 
"lmqTestTopic");
+    private static boolean enableSub = 
Boolean.parseBoolean(System.getProperty("enableSub", "true"));
+    private static String queuePrefix = System.getProperty("queuePrefix", 
"lmqTest");
+    private static int tps = Integer.parseInt(System.getProperty("tps", "1"));
+    private static int lmqNum = Integer.parseInt(System.getProperty("lmqNum", 
"1"));
+    private static int sendThreadNum = 
Integer.parseInt(System.getProperty("sendThreadNum", "64"));
+    private static int consumerThreadNum = 
Integer.parseInt(System.getProperty("consumerThreadNum", "64"));
+    private static String brokerName = System.getProperty("brokerName", 
"broker-a");
+    private static int size = Integer.parseInt(System.getProperty("size", 
"128"));
+    private static int suspendTime = 
Integer.parseInt(System.getProperty("suspendTime", "2000"));
+    private static final boolean RETRY_NO_MATCHED_MSG = 
Boolean.parseBoolean(System.getProperty("retry_no_matched_msg", "false"));
+    private static boolean benchOffset = 
Boolean.parseBoolean(System.getProperty("benchOffset", "false"));
+    private static int benchOffsetNum = 
Integer.parseInt(System.getProperty("benchOffsetNum", "1"));
+    private static Map<MessageQueue, Long> offsetMap = new 
ConcurrentHashMap<>(256);
+    private static Map<MessageQueue, Boolean> pullStatus = new 
ConcurrentHashMap<>(256);
+    private static Map<Integer, Map<MessageQueue, Long>> pullEvent = new 
ConcurrentHashMap<>(256);
+    public static DefaultMQProducer defaultMQProducer;
+    private static int pullConsumerNum = 
Integer.parseInt(System.getProperty("pullConsumerNum", "8"));
+    public static DefaultMQPullConsumer[] defaultMQPullConsumers = new 
DefaultMQPullConsumer[pullConsumerNum];
+    private static AtomicLong rid = new AtomicLong();
+    private static final String LMQ_PREFIX = "%LMQ%";
+
+    public static void main(String[] args) throws InterruptedException, 
MQClientException, MQBrokerException,
+        RemotingException {
+        defaultMQProducer = new DefaultMQProducer();
+        defaultMQProducer.setProducerGroup("PID_LMQ_TEST");
+        defaultMQProducer.setVipChannelEnabled(false);
+        defaultMQProducer.setNamesrvAddr(namesrv);
+        defaultMQProducer.start();
+        //defaultMQProducer.createTopic(lmqTopic, lmqTopic, 8);
+        for (int i = 0; i < pullConsumerNum; i++) {
+            DefaultMQPullConsumer defaultMQPullConsumer = new 
DefaultMQPullConsumer();
+            defaultMQPullConsumers[i] = defaultMQPullConsumer;
+            defaultMQPullConsumer.setNamesrvAddr(namesrv);
+            defaultMQPullConsumer.setVipChannelEnabled(false);
+            defaultMQPullConsumer.setConsumerGroup("CID_RMQ_SYS_LMQ_TEST_" + 
i);
+            defaultMQPullConsumer.setInstanceName("CID_RMQ_SYS_LMQ_TEST_" + i);
+            defaultMQPullConsumer.setRegisterTopics(new 
HashSet<>(Arrays.asList(lmqTopic)));
+            defaultMQPullConsumer.setBrokerSuspendMaxTimeMillis(suspendTime);
+            
defaultMQPullConsumer.setConsumerTimeoutMillisWhenSuspend(suspendTime + 1000);
+            defaultMQPullConsumer.start();
+        }
+        Thread.sleep(3000L);
+        if (benchOffset) {
+            doBenchOffset();
+            return;
+        }
+        ScheduledThreadPoolExecutor consumerPool = new 
ScheduledThreadPoolExecutor(consumerThreadNum, new ThreadFactoryImpl("test"));
+        for (int i = 0; i < consumerThreadNum; i++) {
+            final int idx = i;
+            consumerPool.scheduleWithFixedDelay(() -> {
+                try {
+                    Map<MessageQueue, Long> map = pullEvent.get(idx);
+                    if (map == null) {
+                        return;
+                    }
+                    for (Map.Entry<MessageQueue, Long> entry : map.entrySet()) 
{
+                        try {
+                            Boolean status = pullStatus.get(entry.getKey());
+                            if (Boolean.TRUE.equals(status)) {
+                                continue;
+                            }
+                            doPull(map, entry.getKey(), entry.getValue());
+                        } catch (Exception e) {
+                            logger.error(" ", e);
+                        }
+                    }
+                } catch (Exception e) {

Review comment:
       Same with the last comment,nesting level should be reduced and error log 
may be can more clear

##########
File path: store/src/main/java/org/apache/rocketmq/store/ConsumeQueue.java
##########
@@ -422,6 +429,47 @@ public void putMessagePositionInfoWrapper(DispatchRequest 
request) {
         this.defaultMessageStore.getRunningFlags().makeLogicsQueueError();
     }
 
+    private void multiDispatchQueue(DispatchRequest request, int maxRetries) {
+        Map<String, String> prop = request.getPropertiesMap();
+        String multiDispatchQueue = 
prop.get(MessageConst.PROPERTY_INNER_MULTI_DISPATCH);
+        String multiQueueOffset = 
prop.get(MessageConst.PROPERTY_INNER_MULTI_QUEUE_OFFSET);
+        String[] queues = 
multiDispatchQueue.split(MixAll.MULTI_DISPATCH_QUEUE_SPLITTER);
+        String[] queueOffsets = 
multiQueueOffset.split(MixAll.MULTI_DISPATCH_QUEUE_SPLITTER);
+        if (queues.length != queueOffsets.length) {
+            log.error("[bug] queues.length!=queueOffsets.length ", 
request.getTopic());
+            return;
+        }
+        for (int i = 0; i < queues.length; i++) {
+            String queueName = queues[i];
+            long queueOffset = Long.parseLong(queueOffsets[i]);
+            int queueId = request.getQueueId();
+            if (this.defaultMessageStore.getMessageStoreConfig().isEnableLmq() 
&& MixAll.isLmq(queueName)) {
+                queueId = 0;
+            }
+            ConsumeQueue cq = 
this.defaultMessageStore.findConsumeQueue(queueName, queueId);
+            boolean canWrite = 
this.defaultMessageStore.getRunningFlags().isCQWriteable();
+            for (int j = 0; j < maxRetries && canWrite; j++) {
+                boolean result = 
cq.putMessagePositionInfo(request.getCommitLogOffset(), request.getMsgSize(),
+                    request.getTagsCode(),
+                    queueOffset);
+                if (result) {
+                    break;
+                } else {
+                    log.warn("[BUG]put commit log position info to " + 
queueName + ":" + queueId + " " + request.getCommitLogOffset()
+                        + " failed, retry " + j + " times");
+
+                    try {

Review comment:
       Would you like to reduce the nesting level?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to