This is an automated email from the ASF dual-hosted git repository. zhoubo pushed a commit to branch 5.0.0-preview in repository https://gitbox.apache.org/repos/asf/rocketmq.git
commit 2bf133be29c444f0aea648133123a2d9a61ffb07 Merge: 48d3c7e 1e8e728 Author: odbozhou <[email protected]> AuthorDate: Fri Sep 24 15:35:46 2021 +0800 Merge branch 'develop' into 5.0.0-preview # Conflicts: # .travis.yml # acl/pom.xml # broker/pom.xml # broker/src/main/java/org/apache/rocketmq/broker/processor/AbstractSendMessageProcessor.java # broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java # broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java # client/pom.xml # client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalanceImpl.java # client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java # client/src/test/java/org/apache/rocketmq/client/impl/MQClientAPIImplTest.java # common/pom.xml # common/src/main/java/org/apache/rocketmq/common/MQVersion.java # common/src/main/java/org/apache/rocketmq/common/protocol/RequestCode.java # distribution/pom.xml # example/pom.xml # filter/pom.xml # logappender/pom.xml # logging/pom.xml # namesrv/pom.xml # namesrv/src/main/java/org/apache/rocketmq/namesrv/processor/DefaultRequestProcessor.java # namesrv/src/main/java/org/apache/rocketmq/namesrv/routeinfo/RouteInfoManager.java # openmessaging/pom.xml # pom.xml # remoting/pom.xml # srvutil/pom.xml # store/pom.xml # store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java # test/pom.xml # tools/pom.xml # tools/src/main/java/org/apache/rocketmq/tools/command/MQAdminStartup.java .github/workflows/greetings.yml | 29 + README.md | 2 +- .../org/apache/rocketmq/acl/common/AclUtils.java | 69 +- .../rocketmq/acl/plain/PlainPermissionManager.java | 53 +- .../acl/plain/RemoteAddressStrategyFactory.java | 2 +- .../apache/rocketmq/acl/common/AclUtilsTest.java | 1 + .../acl/plain/PlainAccessValidatorTest.java | 53 +- .../broker/pagecache/ManyMessageTransfer.java | 27 + .../broker/pagecache/OneMessageTransfer.java | 27 + .../broker/pagecache/QueryMessageTransfer.java | 27 + .../broker/plugin/AbstractPluginMessageStore.java | 4 +- .../processor/AbstractSendMessageProcessor.java | 89 ++- .../broker/processor/AdminBrokerProcessor.java | 20 +- .../broker/processor/ReplyMessageProcessor.java | 4 +- .../broker/processor/SendMessageProcessor.java | 120 ++- .../AbstractTransactionalMessageCheckListener.java | 2 - .../DefaultTransactionalMessageCheckListener.java | 4 +- .../AbstractSendMessageProcessorTest.java | 82 ++ client/pom.xml | 4 - .../org/apache/rocketmq/client/ClientConfig.java | 13 +- .../client/consumer/DefaultLitePullConsumer.java | 4 +- .../client/consumer/DefaultMQPushConsumer.java | 18 +- .../rocketmq/client/impl/MQClientAPIImpl.java | 28 +- .../ConsumeMessageConcurrentlyService.java | 28 +- .../consumer/ConsumeMessageOrderlyService.java | 4 +- .../impl/consumer/DefaultLitePullConsumerImpl.java | 64 +- .../impl/consumer/DefaultMQPushConsumerImpl.java | 2 +- .../client/impl/consumer/RebalanceImpl.java | 2 +- .../impl/consumer/RebalanceLitePullImpl.java | 3 +- .../client/impl/factory/MQClientInstance.java | 42 +- .../impl/producer/DefaultMQProducerImpl.java | 61 +- .../client/producer/DefaultMQProducer.java | 54 +- .../client/producer/RequestFutureTable.java | 7 + .../client/trace/AsyncTraceDispatcher.java | 74 +- .../rocketmq/client/trace/TraceDataEncoder.java | 1 - .../apache/rocketmq/client/trace/TraceView.java | 4 +- .../rocketmq/client/impl/MQClientAPIImplTest.java | 27 + .../ConsumeMessageConcurrentlyServiceTest.java | 2 +- .../trace/DefaultMQConsumerWithTraceTest.java | 9 + .../DefaultMQLitePullConsumerWithTraceTest.java | 12 +- .../trace/DefaultMQProducerWithTraceTest.java | 10 + .../client/trace/TraceDataEncoderTest.java | 106 ++- .../rocketmq/client/trace/TraceViewTest.java | 4 +- .../org/apache/rocketmq/common/BrokerConfig.java | 8 +- .../java/org/apache/rocketmq/common/MixAll.java | 1 + .../java/org/apache/rocketmq/common/UtilAll.java | 33 +- .../apache/rocketmq/common/message/Message.java | 11 +- .../rocketmq/common/message/MessageDecoder.java | 41 +- .../rocketmq/common/protocol/NamespaceUtil.java | 2 +- .../rocketmq/common/protocol/RequestCode.java | 2 + .../namesrv/AddWritePermOfBrokerRequestHeader.java | 39 + .../AddWritePermOfBrokerResponseHeader.java | 38 + .../apache/rocketmq/common/stats/StatsItem.java | 22 +- .../apache/rocketmq/common/stats/StatsItemSet.java | 8 +- .../common/utils/NameServerAddressUtils.java | 1 + .../org/apache/rocketmq/common/UtilAllTest.java | 13 + .../common/message/MessageDecoderTest.java | 108 +++ .../rocketmq/common/stats/StatsItemSetTest.java | 4 +- .../rocketmq/common/utils/IOTinyUtilsTest.java | 4 +- distribution/benchmark/batchproducer.sh | 18 + distribution/bin/runbroker.cmd | 2 +- distribution/bin/runbroker.sh | 2 +- distribution/bin/runserver.cmd | 2 +- distribution/bin/runserver.sh | 6 +- distribution/release.xml | 4 + docs/cn/Configuration_System.md | 70 ++ docs/cn/Deployment.md | 159 ++++ docs/cn/Example_Batch.md | 82 ++ docs/cn/Example_Delay.md | 85 +++ docs/cn/Example_Simple_cn.md | 136 ++++ docs/cn/FAQ.md | 110 +++ docs/cn/RocketMQ_Example.md | 8 +- docs/cn/best_practice.md | 2 +- docs/cn/design.md | 4 +- docs/cn/image/rocketmq_architecture_1.png | Bin 89290 -> 62810 bytes docs/cn/image/rocketmq_architecture_3.png | Bin 106758 -> 74884 bytes docs/cn/msg_trace/user_guide.md | 19 +- docs/cn/operation.md | 10 +- docs/en/CLITools.md | 8 + docs/en/Example_Transaction.md | 2 +- docs/en/best_practice.md | 2 +- .../client/java/API_Reference_DefaultMQProducer.md | 71 ++ docs/en/image/rocketmq_architecture_1.png | Bin 89290 -> 62810 bytes docs/en/image/rocketmq_architecture_3.png | Bin 106758 -> 74884 bytes docs/en/msg_trace/user_guide.md | 16 + docs/en/operation.md | 2 +- .../rocketmq/example/benchmark/BatchProducer.java | 403 ++++++++++ .../rocketmq/example/benchmark/Consumer.java | 66 +- .../rocketmq/example/benchmark/Producer.java | 157 ++-- .../example/benchmark/TransactionProducer.java | 117 +-- .../rocketmq/example/quickstart/Producer.java | 33 + .../rocketmq/example/simple/PullConsumer.java | 154 ++-- .../filter/expression/UnaryExpression.java | 16 +- .../rocketmq/filter/parser/ParseException.java | 7 +- .../rocketmq/filter/parser/TokenMgrError.java | 3 +- .../org/apache/rocketmq/filter/ParserTest.java | 2 +- .../namesrv/processor/DefaultRequestProcessor.java | 24 + .../namesrv/routeinfo/RouteInfoManager.java | 41 +- .../namesrv/routeinfo/RouteInfoManagerTest.java | 52 +- pom.xml | 2 +- .../rocketmq/remoting/common/RemotingHelper.java | 2 +- .../rocketmq/remoting/netty/NettyClientConfig.java | 8 +- .../rocketmq/remoting/netty/NettyLogger.java | 46 ++ .../remoting/netty/NettyRemotingAbstract.java | 7 +- .../rocketmq/remoting/netty/NettySystemConfig.java | 16 + .../remoting/netty/NettyClientConfigTest.java | 64 ++ .../rocketmq/store/AppendMessageCallback.java | 5 +- .../apache/rocketmq/store/AppendMessageResult.java | 17 + .../java/org/apache/rocketmq/store/CommitLog.java | 835 ++++++++------------- .../apache/rocketmq/store/DefaultMessageStore.java | 73 +- .../java/org/apache/rocketmq/store/MappedFile.java | 26 +- .../rocketmq/store/MessageExtBrokerInner.java | 12 + .../org/apache/rocketmq/store/MessageStore.java | 4 +- .../rocketmq/store/SelectMappedBufferResult.java | 7 - .../apache/rocketmq/store/StoreStatsService.java | 109 +-- .../rocketmq/store/config/MessageStoreConfig.java | 7 +- .../rocketmq/store/dledger/DLedgerCommitLog.java | 250 +----- .../org/apache/rocketmq/store/ha/HAService.java | 63 +- .../apache/rocketmq/store/ha/WaitNotifyObject.java | 64 +- .../store/schedule/ScheduleMessageService.java | 5 + .../apache/rocketmq/store/stats/BrokerStats.java | 4 +- .../rocketmq/store/stats/BrokerStatsManager.java | 50 +- .../apache/rocketmq/store/AppendCallbackTest.java | 28 +- .../apache/rocketmq/store/BatchPutMessageTest.java | 18 +- .../rocketmq/store/StoreStatsServiceTest.java | 18 +- .../store/schedule/ScheduleMessageServiceTest.java | 6 +- .../test/java/stats/BrokerStatsManagerTest.java | 47 ++ .../rocketmq/tools/admin/DefaultMQAdminExt.java | 8 +- .../tools/admin/DefaultMQAdminExtImpl.java | 12 +- .../apache/rocketmq/tools/admin/MQAdminExt.java | 5 +- .../rocketmq/tools/command/MQAdminStartup.java | 4 + .../command/acl/DeleteAccessConfigSubCommand.java | 14 +- .../command/acl/UpdateAccessConfigSubCommand.java | 8 +- .../acl/UpdateGlobalWhiteAddrSubCommand.java | 6 +- .../consumer/GetConsumerConfigSubCommand.java | 146 ++++ .../message/QueryMsgByUniqueKeySubCommand.java | 5 +- .../tools/command/message/SendMessageCommand.java | 17 +- ...SubCommand.java => AddWritePermSubCommand.java} | 29 +- .../command/namesrv/WipeWritePermSubCommand.java | 2 +- .../tools/admin/DefaultMQAdminExtTest.java | 7 + .../consumer/GetConsumerConfigSubCommandTest.java | 83 ++ .../namesrv/AddWritePermSubCommandTest.java | 37 + 142 files changed, 3998 insertions(+), 1606 deletions(-) diff --cc broker/src/main/java/org/apache/rocketmq/broker/processor/AbstractSendMessageProcessor.java index 4daa832,9d26e99..d2f751b --- a/broker/src/main/java/org/apache/rocketmq/broker/processor/AbstractSendMessageProcessor.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/AbstractSendMessageProcessor.java @@@ -21,19 -19,15 +21,22 @@@ import com.google.common.collect.Maps import io.netty.channel.ChannelHandlerContext; import java.net.InetSocketAddress; import java.net.SocketAddress; + import java.util.HashMap; import java.util.List; -import java.util.Map; +import java.util.Locale; - import java.util.Map; ++import java.util.Map;Optional ++ + import java.util.concurrent.ThreadLocalRandom; + +import java.util.Optional; - import java.util.Random; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.atomic.LongAdder; import org.apache.rocketmq.broker.BrokerController; +import org.apache.rocketmq.broker.domain.LogicalQueuesInfoInBroker; import org.apache.rocketmq.broker.mqtrace.SendMessageContext; import org.apache.rocketmq.broker.mqtrace.SendMessageHook; -import org.apache.rocketmq.common.topic.TopicValidator; +import org.apache.rocketmq.broker.topic.TopicConfigManager; import org.apache.rocketmq.common.MixAll; import org.apache.rocketmq.common.TopicConfig; import org.apache.rocketmq.common.TopicFilterType; diff --cc broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java index b3309e1,86aab63..e7b7949 --- a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java @@@ -154,7 -116,7 +154,8 @@@ import org.apache.rocketmq.remoting.net import org.apache.rocketmq.remoting.protocol.LanguageCode; import org.apache.rocketmq.remoting.protocol.RemotingCommand; import org.apache.rocketmq.remoting.protocol.RemotingSerializable; + import org.apache.rocketmq.remoting.protocol.RemotingSysResponseCode; +import org.apache.rocketmq.srvutil.ConcurrentHashMapUtil; import org.apache.rocketmq.store.ConsumeQueue; import org.apache.rocketmq.store.ConsumeQueueExt; import org.apache.rocketmq.store.DefaultMessageStore; @@@ -261,31 -235,9 +262,29 @@@ public class AdminBrokerProcessor exten return resumeCheckHalfMessage(ctx, request); case RequestCode.GET_BROKER_CLUSTER_ACL_CONFIG: return getBrokerClusterAclConfig(ctx, request); + case RequestCode.GET_TOPIC_CONFIG: + return getTopicConfig(ctx, request); + case RequestCode.UPDATE_TOPIC_LOGICAL_QUEUE_MAPPING: + return updateTopicLogicalQueueMapping(ctx, request); + case RequestCode.DELETE_TOPIC_LOGICAL_QUEUE_MAPPING: + return deleteTopicLogicalQueueMapping(ctx, request); + case RequestCode.QUERY_TOPIC_LOGICAL_QUEUE_MAPPING: + return queryTopicLogicalQueueMapping(ctx, request); + case RequestCode.SEAL_TOPIC_LOGICAL_QUEUE: + return sealTopicLogicalQueue(ctx, request); + case RequestCode.REUSE_TOPIC_LOGICAL_QUEUE: + return reuseTopicLogicalQueue(ctx, request); + case RequestCode.CREATE_MESSAGE_QUEUE_FOR_LOGICAL_QUEUE: + return createMessageQueueForLogicalQueue(ctx, request); + case RequestCode.MIGRATE_TOPIC_LOGICAL_QUEUE_PREPARE: + return migrateTopicLogicalQueuePrepare(ctx, request); + case RequestCode.MIGRATE_TOPIC_LOGICAL_QUEUE_COMMIT: + return migrateTopicLogicalQueueCommit(ctx, request); + case RequestCode.MIGRATE_TOPIC_LOGICAL_QUEUE_NOTIFY: + return migrateTopicLogicalQueueNotify(ctx, request); default: - break; + return getUnknownCmdResponse(ctx, request); } - - return null; } @Override diff --cc broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java index 97b7e62,3a401e1..692c98c --- a/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java @@@ -285,16 -292,18 +292,24 @@@ public class SendMessageProcessor exten msgInner.setReconsumeTimes(requestHeader.getReconsumeTimes() == null ? 0 : requestHeader.getReconsumeTimes()); String clusterName = this.brokerController.getBrokerConfig().getBrokerClusterName(); MessageAccessor.putProperty(msgInner, MessageConst.PROPERTY_CLUSTER, clusterName); - msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgInner.getProperties())); + if (origProps.containsKey(MessageConst.PROPERTY_WAIT_STORE_MSG_OK)) { + // There is no need to store "WAIT=true", remove it from propertiesString to save 9 bytes for each message. + // It works for most case. In some cases msgInner.setPropertiesString invoked later and replace it. + String waitStoreMsgOKValue = origProps.remove(MessageConst.PROPERTY_WAIT_STORE_MSG_OK); + msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgInner.getProperties())); + // Reput to properties, since msgInner.isWaitStoreMsgOK() will be invoked later + origProps.put(MessageConst.PROPERTY_WAIT_STORE_MSG_OK, waitStoreMsgOKValue); + } else { + msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgInner.getProperties())); + } + LogicalQueueContext logicalQueueContext = super.buildLogicalQueueContext(msgInner.getTopic(), msgInner.getQueueId(), response); + CompletableFuture<RemotingCommand> future = logicalQueueContext.hookBeforePut(ctx, requestHeader, request, response); + if (future != null) { + return future; + } + CompletableFuture<PutMessageResult> putMessageResult = null; - Map<String, String> origProps = MessageDecoder.string2messageProperties(requestHeader.getProperties()); String transFlag = origProps.get(MessageConst.PROPERTY_TRANSACTION_PREPARED); if (transFlag != null && Boolean.parseBoolean(transFlag)) { if (this.brokerController.getBrokerConfig().isRejectTransactionMessage()) { diff --cc client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalanceImpl.java index 7a457c1,7677d8b..ab0d885 --- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalanceImpl.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalanceImpl.java @@@ -557,201 -401,9 +557,201 @@@ public abstract class RebalanceImpl log.warn("doRebalance, {}, add new mq failed, {}", consumerGroup, mq); } } + + } + + if (!allMQLocked) { + mQClientFactory.rebalanceLater(500); + } + + this.dispatchPullRequest(pullRequestList, 500); + + return changed; + } + + private boolean updateMessageQueueAssignment(final String topic, final Set<MessageQueueAssignment> assignments, + final boolean isOrder) { + boolean changed = false; + + Map<MessageQueue, MessageQueueAssignment> mq2PushAssignment = new HashMap<MessageQueue, MessageQueueAssignment>(); + Map<MessageQueue, MessageQueueAssignment> mq2PopAssignment = new HashMap<MessageQueue, MessageQueueAssignment>(); + for (MessageQueueAssignment assignment : assignments) { + MessageQueue messageQueue = assignment.getMessageQueue(); + if (messageQueue == null) { + continue; + } + if (MessageRequestMode.POP == assignment.getMode()) { + mq2PopAssignment.put(messageQueue, assignment); + } else { + mq2PushAssignment.put(messageQueue, assignment); + } + } + + if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) { + if (mq2PopAssignment.isEmpty() && !mq2PushAssignment.isEmpty()) { + //pop switch to push + //subscribe pop retry topic + try { + final String retryTopic = KeyBuilder.buildPopRetryTopic(topic, getConsumerGroup()); + SubscriptionData subscriptionData = FilterAPI.buildSubscriptionData(retryTopic, SubscriptionData.SUB_ALL); + getSubscriptionInner().put(retryTopic, subscriptionData); + } catch (Exception ignored) { + } + + } else if (!mq2PopAssignment.isEmpty() && mq2PushAssignment.isEmpty()) { + //push switch to pop + //unsubscribe pop retry topic + try { + final String retryTopic = KeyBuilder.buildPopRetryTopic(topic, getConsumerGroup()); + getSubscriptionInner().remove(retryTopic); + } catch (Exception ignored) { + } + + } + } + + { + // drop process queues no longer belong me + HashMap<MessageQueue, ProcessQueue> removeQueueMap = new HashMap<MessageQueue, ProcessQueue>(this.processQueueTable.size()); + Iterator<Entry<MessageQueue, ProcessQueue>> it = this.processQueueTable.entrySet().iterator(); + while (it.hasNext()) { + Entry<MessageQueue, ProcessQueue> next = it.next(); + MessageQueue mq = next.getKey(); + ProcessQueue pq = next.getValue(); + + if (mq.getTopic().equals(topic)) { + if (!mq2PushAssignment.containsKey(mq)) { + pq.setDropped(true); + removeQueueMap.put(mq, pq); + } else if (pq.isPullExpired() && this.consumeType() == ConsumeType.CONSUME_PASSIVELY) { + pq.setDropped(true); + removeQueueMap.put(mq, pq); + log.error("[BUG]doRebalance, {}, try remove unnecessary mq, {}, because pull is pause, so try to fixed it", + consumerGroup, mq); + } + } + } + // remove message queues no longer belong me + for (Entry<MessageQueue, ProcessQueue> entry : removeQueueMap.entrySet()) { + MessageQueue mq = entry.getKey(); + ProcessQueue pq = entry.getValue(); + + if (this.removeUnnecessaryMessageQueue(mq, pq)) { + this.processQueueTable.remove(mq); + changed = true; + log.info("doRebalance, {}, remove unnecessary mq, {}", consumerGroup, mq); + } + } + } + + { + HashMap<MessageQueue, PopProcessQueue> removeQueueMap = new HashMap<MessageQueue, PopProcessQueue>(this.popProcessQueueTable.size()); + Iterator<Entry<MessageQueue, PopProcessQueue>> it = this.popProcessQueueTable.entrySet().iterator(); + while (it.hasNext()) { + Entry<MessageQueue, PopProcessQueue> next = it.next(); + MessageQueue mq = next.getKey(); + PopProcessQueue pq = next.getValue(); + + if (mq.getTopic().equals(topic)) { + if (!mq2PopAssignment.containsKey(mq)) { + //the queue is no longer your assignment + pq.setDropped(true); + removeQueueMap.put(mq, pq); + } else if (pq.isPullExpired() && this.consumeType() == ConsumeType.CONSUME_PASSIVELY) { + pq.setDropped(true); + removeQueueMap.put(mq, pq); + log.error("[BUG]doRebalance, {}, try remove unnecessary pop mq, {}, because pop is pause, so try to fixed it", + consumerGroup, mq); + } + } + } + // remove message queues no longer belong me + for (Entry<MessageQueue, PopProcessQueue> entry : removeQueueMap.entrySet()) { + MessageQueue mq = entry.getKey(); + PopProcessQueue pq = entry.getValue(); + + if (this.removeUnnecessaryPopMessageQueue(mq, pq)) { + this.popProcessQueueTable.remove(mq); + changed = true; + log.info("doRebalance, {}, remove unnecessary pop mq, {}", consumerGroup, mq); + } + } + } + + { + // add new message queue + boolean allMQLocked = true; + List<PullRequest> pullRequestList = new ArrayList<PullRequest>(); + for (MessageQueue mq : mq2PushAssignment.keySet()) { + if (!this.processQueueTable.containsKey(mq)) { + if (isOrder && !this.lock(mq)) { + log.warn("doRebalance, {}, add a new mq failed, {}, because lock failed", consumerGroup, mq); + allMQLocked = false; + continue; + } + + this.removeDirtyOffset(mq); + ProcessQueue pq = createProcessQueue(); + pq.setLocked(true); + long nextOffset = -1L; + try { + nextOffset = this.computePullFromWhereWithException(mq); - } catch (MQClientException e) { ++ } catch (Exception e) { + log.info("doRebalance, {}, compute offset failed, {}", consumerGroup, mq); + continue; + } + + if (nextOffset >= 0) { + ProcessQueue pre = this.processQueueTable.putIfAbsent(mq, pq); + if (pre != null) { + log.info("doRebalance, {}, mq already exists, {}", consumerGroup, mq); + } else { + log.info("doRebalance, {}, add a new mq, {}", consumerGroup, mq); + PullRequest pullRequest = new PullRequest(); + pullRequest.setConsumerGroup(consumerGroup); + pullRequest.setNextOffset(nextOffset); + pullRequest.setMessageQueue(mq); + pullRequest.setProcessQueue(pq); + pullRequestList.add(pullRequest); + changed = true; + } + } else { + log.warn("doRebalance, {}, add new mq failed, {}", consumerGroup, mq); + } + } + } + + if (!allMQLocked) { + mQClientFactory.rebalanceLater(500); + } + this.dispatchPullRequest(pullRequestList, 500); } - this.dispatchPullRequest(pullRequestList); + { + // add new message queue + List<PopRequest> popRequestList = new ArrayList<PopRequest>(); + for (MessageQueue mq : mq2PopAssignment.keySet()) { + if (!this.popProcessQueueTable.containsKey(mq)) { + PopProcessQueue pq = createPopProcessQueue(); + PopProcessQueue pre = this.popProcessQueueTable.putIfAbsent(mq, pq); + if (pre != null) { + log.info("doRebalance, {}, mq pop already exists, {}", consumerGroup, mq); + } else { + log.info("doRebalance, {}, add a new pop mq, {}", consumerGroup, mq); + PopRequest popRequest = new PopRequest(); + popRequest.setTopic(topic); + popRequest.setConsumerGroup(consumerGroup); + popRequest.setMessageQueue(mq); + popRequest.setPopProcessQueue(pq); + popRequest.setInitMode(getConsumeInitMode()); + popRequestList.add(popRequest); + changed = true; + } + } + } + + this.dispatchPopPullRequest(popRequestList, 500); + } return changed; } diff --cc client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java index c1a50dd,e897d49..dafc4f8 --- a/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java @@@ -676,7 -619,7 +676,7 @@@ public class MQClientInstance } } } else { - topicRouteData = this.mQClientAPIImpl.getTopicRouteInfoFromNameServer(topic, 1000 * 3, true, logicalQueueIdsFilter); - topicRouteData = this.mQClientAPIImpl.getTopicRouteInfoFromNameServer(topic, clientConfig.getMqClientApiTimeout()); ++ topicRouteData = this.mQClientAPIImpl.getTopicRouteInfoFromNameServer(topic, clientConfig.getMqClientApiTimeout(), true, logicalQueueIdsFilter); } if (topicRouteData != null) { TopicRouteData old = this.topicRouteTable.get(topic); diff --cc client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java index 8802a9c,bdc103f..ba479d2 --- a/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java @@@ -22,14 -20,10 +22,12 @@@ import java.io.IOException import java.net.UnknownHostException; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collections; import java.util.HashSet; import java.util.List; +import java.util.Locale; import java.util.Random; import java.util.Set; - import java.util.Timer; - import java.util.TimerTask; import java.util.concurrent.BlockingQueue; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; diff --cc client/src/test/java/org/apache/rocketmq/client/impl/MQClientAPIImplTest.java index c91d55a,e1b3bed..34b34f9 --- a/client/src/test/java/org/apache/rocketmq/client/impl/MQClientAPIImplTest.java +++ b/client/src/test/java/org/apache/rocketmq/client/impl/MQClientAPIImplTest.java @@@ -37,42 -25,14 +37,44 @@@ import org.apache.rocketmq.client.produ import org.apache.rocketmq.client.producer.SendCallback; import org.apache.rocketmq.client.producer.SendResult; import org.apache.rocketmq.client.producer.SendStatus; +import org.apache.rocketmq.common.AclConfig; +import org.apache.rocketmq.common.DataVersion; import org.apache.rocketmq.common.PlainAccessConfig; +import org.apache.rocketmq.common.TopicConfig; import org.apache.rocketmq.common.message.Message; import org.apache.rocketmq.common.message.MessageConst; + import org.apache.rocketmq.common.protocol.RequestCode; +import org.apache.rocketmq.common.message.MessageDecoder; +import org.apache.rocketmq.common.message.MessageExt; +import org.apache.rocketmq.common.message.MessageQueueAssignment; +import org.apache.rocketmq.common.message.MessageRequestMode; import org.apache.rocketmq.common.protocol.ResponseCode; +import org.apache.rocketmq.common.protocol.body.ClusterAclVersionInfo; +import org.apache.rocketmq.common.protocol.body.QueryAssignmentResponseBody; +import org.apache.rocketmq.common.protocol.header.AckMessageRequestHeader; +import org.apache.rocketmq.common.protocol.header.ChangeInvisibleTimeRequestHeader; +import org.apache.rocketmq.common.protocol.header.ChangeInvisibleTimeResponseHeader; +import org.apache.rocketmq.common.protocol.header.ExtraInfoUtil; +import org.apache.rocketmq.common.protocol.header.GetBrokerAclConfigResponseHeader; +import org.apache.rocketmq.common.protocol.header.GetBrokerClusterAclConfigResponseBody; +import org.apache.rocketmq.common.protocol.header.GetBrokerClusterAclConfigResponseHeader; +import org.apache.rocketmq.common.protocol.header.GetConsumerListByGroupResponseBody; +import org.apache.rocketmq.common.protocol.header.GetConsumerListByGroupResponseHeader; +import org.apache.rocketmq.common.protocol.header.GetEarliestMsgStoretimeResponseHeader; +import org.apache.rocketmq.common.protocol.header.GetMaxOffsetResponseHeader; +import org.apache.rocketmq.common.protocol.header.GetMinOffsetResponseHeader; +import org.apache.rocketmq.common.protocol.header.PopMessageRequestHeader; +import org.apache.rocketmq.common.protocol.header.PopMessageResponseHeader; +import org.apache.rocketmq.common.protocol.header.QueryConsumerOffsetRequestHeader; +import org.apache.rocketmq.common.protocol.header.QueryConsumerOffsetResponseHeader; +import org.apache.rocketmq.common.protocol.header.SearchOffsetResponseHeader; import org.apache.rocketmq.common.protocol.header.SendMessageRequestHeader; import org.apache.rocketmq.common.protocol.header.SendMessageResponseHeader; + import org.apache.rocketmq.common.protocol.header.namesrv.AddWritePermOfBrokerResponseHeader; +import org.apache.rocketmq.common.protocol.header.UpdateConsumerOffsetRequestHeader; +import org.apache.rocketmq.common.protocol.header.UpdateConsumerOffsetResponseHeader; +import org.apache.rocketmq.common.protocol.heartbeat.MessageModel; +import org.apache.rocketmq.common.subscription.SubscriptionGroupConfig; import org.apache.rocketmq.remoting.InvokeCallback; import org.apache.rocketmq.remoting.RemotingClient; import org.apache.rocketmq.remoting.exception.RemotingException; diff --cc common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java index 17cc2a1,f710cdb..bf2355c --- a/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java +++ b/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java @@@ -58,12 -57,10 +58,11 @@@ public class BrokerConfig @ImportantField private boolean traceTopicEnable = false; /** - * thread numbers for send message thread pool, since spin lock will be used by default since 4.0.x, the default - * value is 1. + * thread numbers for send message thread pool. */ - private int sendMessageThreadPoolNums = 1; //16 + Runtime.getRuntime().availableProcessors() * 4; + private int sendMessageThreadPoolNums = Math.min(Runtime.getRuntime().availableProcessors(), 4); private int pullMessageThreadPoolNums = 16 + Runtime.getRuntime().availableProcessors() * 2; + private int ackMessageThreadPoolNums = 3; private int processReplyMessageThreadPoolNums = 16 + Runtime.getRuntime().availableProcessors() * 2; private int queryMessageThreadPoolNums = 8 + Runtime.getRuntime().availableProcessors(); diff --cc common/src/main/java/org/apache/rocketmq/common/protocol/RequestCode.java index 3613049,5624a7e..04f126b --- a/common/src/main/java/org/apache/rocketmq/common/protocol/RequestCode.java +++ b/common/src/main/java/org/apache/rocketmq/common/protocol/RequestCode.java @@@ -193,18 -189,5 +193,20 @@@ public class RequestCode public static final int PUSH_REPLY_MESSAGE_TO_CLIENT = 326; + public static final int ADD_WRITE_PERM_OF_BROKER = 327; ++ + public static final int GET_TOPIC_CONFIG = 351; + + public static final int QUERY_ASSIGNMENT = 400; + public static final int SET_MESSAGE_REQUEST_MODE = 401; + + public static final int UPDATE_TOPIC_LOGICAL_QUEUE_MAPPING = 411; + public static final int DELETE_TOPIC_LOGICAL_QUEUE_MAPPING = 422; + public static final int QUERY_TOPIC_LOGICAL_QUEUE_MAPPING = 413; + public static final int SEAL_TOPIC_LOGICAL_QUEUE = 414; + public static final int REUSE_TOPIC_LOGICAL_QUEUE = 415; + public static final int CREATE_MESSAGE_QUEUE_FOR_LOGICAL_QUEUE = 416; + public static final int MIGRATE_TOPIC_LOGICAL_QUEUE_PREPARE = 417; + public static final int MIGRATE_TOPIC_LOGICAL_QUEUE_COMMIT = 418; + public static final int MIGRATE_TOPIC_LOGICAL_QUEUE_NOTIFY = 419; } diff --cc store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java index b11eb49,5bf68ac..152af7b --- a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java +++ b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java @@@ -35,7 -34,7 +35,8 @@@ import java.util.Set import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; + import java.util.concurrent.ExecutionException; +import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; @@@ -675,8 -634,8 +638,8 @@@ public class DefaultMessageStore implem continue; } - this.storeStatsService.getGetMessageTransferedMsgCount().incrementAndGet(); + this.storeStatsService.getGetMessageTransferedMsgCount().add(1); - getResult.addMessage(selectResult); + getResult.addMessage(selectResult, offset + (i / ConsumeQueue.CQ_STORE_UNIT_SIZE)); status = GetMessageStatus.FOUND; nextPhyFileStartOffset = Long.MIN_VALUE; } diff --cc store/src/main/java/org/apache/rocketmq/store/schedule/ScheduleMessageService.java index 9057ebe,1164ab8..c127515 --- a/store/src/main/java/org/apache/rocketmq/store/schedule/ScheduleMessageService.java +++ b/store/src/main/java/org/apache/rocketmq/store/schedule/ScheduleMessageService.java @@@ -26,10 -24,13 +26,11 @@@ import java.util.TimerTask import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.stream.Collectors; import org.apache.rocketmq.common.ConfigManager; + import org.apache.rocketmq.common.MixAll; import org.apache.rocketmq.common.TopicFilterType; import org.apache.rocketmq.common.constant.LoggerName; -import org.apache.rocketmq.common.topic.TopicValidator; -import org.apache.rocketmq.logging.InternalLogger; -import org.apache.rocketmq.logging.InternalLoggerFactory; import org.apache.rocketmq.common.message.MessageAccessor; import org.apache.rocketmq.common.message.MessageConst; import org.apache.rocketmq.common.message.MessageDecoder; diff --cc tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminExt.java index c1b42f5,d701056..0da449d --- a/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminExt.java +++ b/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminExt.java @@@ -98,10 -90,9 +98,10 @@@ public interface MQAdminExt extends MQA final SubscriptionGroupConfig config) throws RemotingException, MQBrokerException, InterruptedException, MQClientException; - SubscriptionGroupConfig examineSubscriptionGroupConfig(final String addr, final String group); + SubscriptionGroupConfig examineSubscriptionGroupConfig(final String addr, final String group) throws InterruptedException, RemotingException, MQClientException, MQBrokerException; - TopicConfig examineTopicConfig(final String addr, final String topic); + TopicConfig examineTopicConfig(final String addr, + final String topic) throws InterruptedException, MQBrokerException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException; TopicStatsTable examineTopicStats( final String topic) throws RemotingException, MQClientException, InterruptedException, diff --cc tools/src/main/java/org/apache/rocketmq/tools/command/MQAdminStartup.java index 3e98939,4411a6c..1d29959 --- a/tools/src/main/java/org/apache/rocketmq/tools/command/MQAdminStartup.java +++ b/tools/src/main/java/org/apache/rocketmq/tools/command/MQAdminStartup.java @@@ -49,14 -49,9 +49,15 @@@ import org.apache.rocketmq.tools.comman import org.apache.rocketmq.tools.command.consumer.ConsumerProgressSubCommand; import org.apache.rocketmq.tools.command.consumer.ConsumerStatusSubCommand; import org.apache.rocketmq.tools.command.consumer.DeleteSubscriptionGroupCommand; + import org.apache.rocketmq.tools.command.consumer.GetConsumerConfigSubCommand; +import org.apache.rocketmq.tools.command.consumer.SetConsumeModeSubCommand; import org.apache.rocketmq.tools.command.consumer.StartMonitoringSubCommand; import org.apache.rocketmq.tools.command.consumer.UpdateSubGroupSubCommand; +import org.apache.rocketmq.tools.command.logicalqueue.DeleteTopicLogicalQueueMappingCommand; +import org.apache.rocketmq.tools.command.logicalqueue.MigrateTopicLogicalQueueCommand; +import org.apache.rocketmq.tools.command.logicalqueue.QueryTopicLogicalQueueMappingCommand; +import org.apache.rocketmq.tools.command.logicalqueue.UpdateTopicLogicalQueueMappingCommand; +import org.apache.rocketmq.tools.command.logicalqueue.UpdateTopicLogicalQueueNumCommand; import org.apache.rocketmq.tools.command.message.CheckMsgSendRTCommand; import org.apache.rocketmq.tools.command.message.ConsumeMessageCommand; import org.apache.rocketmq.tools.command.message.PrintMessageByQueueCommand;
