jack870131 commented on a change in pull request #2169:
URL: https://github.com/apache/rocketmq/pull/2169#discussion_r470074684
##########
File path:
broker/src/main/java/org/apache/rocketmq/broker/processor/ConsumerManageProcessor.java
##########
@@ -152,4 +168,75 @@ private RemotingCommand
queryConsumerOffset(ChannelHandlerContext ctx, RemotingC
return response;
}
+
+ private RemotingCommand allocateMessageQueue(ChannelHandlerContext ctx,
RemotingCommand request)
+ throws RemotingCommandException {
+ final RemotingCommand response =
+
RemotingCommand.createResponseCommand(AllocateMessageQueueResponseHeader.class);
+ final AllocateMessageQueueRequestHeader requestHeader =
+ (AllocateMessageQueueRequestHeader)
request.decodeCommandCustomHeader(AllocateMessageQueueRequestHeader.class);
+ final AllocateMessageQueueRequestBody requestBody =
AllocateMessageQueueRequestBody.decode(request.getBody(),
+ AllocateMessageQueueRequestBody.class);
+
+ AllocateMessageQueueStrategy strategy = null;
+ String consumerGroup = requestHeader.getConsumerGroup();
+ String strategyName = requestHeader.getStrategyName();
+
+ if
(this.brokerController.getAllocateMessageQueueStrategyTable().containsKey(consumerGroup))
{
+ strategy =
this.brokerController.getAllocateMessageQueueStrategyTable().get(consumerGroup);
+ } else {
+ if
(strategyName.startsWith(AllocateMessageQueueStrategyConstants.ALLOCATE_MACHINE_ROOM_NEARBY))
{
+
response.setCode(ResponseCode.ALLOCATE_MESSAGE_QUEUE_STRATEGY_NOT_SUPPORTED);
+ response.setRemark("The broker does not support message queue
strategy " + strategyName);
+ return response;
+ } else {
+ switch (strategyName) {
+ case
AllocateMessageQueueStrategyConstants.ALLOCATE_MESSAGE_QUEUE_AVERAGELY:
+ strategy = new AllocateMessageQueueAveragely();
+ break;
+ case
AllocateMessageQueueStrategyConstants.ALLOCATE_MESSAGE_QUEUE_AVERAGELY_BY_CIRCLE:
+ strategy = new AllocateMessageQueueAveragelyByCircle();
+ break;
+ case
AllocateMessageQueueStrategyConstants.ALLOCATE_MESSAGE_QUEUE_BY_CONFIG:
+ strategy = new AllocateMessageQueueByConfig();
+ break;
+ case
AllocateMessageQueueStrategyConstants.ALLOCATE_MESSAGE_QUEUE_BY_MACHINE_ROOM:
+ strategy = new AllocateMessageQueueByMachineRoom();
+ break;
+ case
AllocateMessageQueueStrategyConstants.ALLOCATE_MESSAGE_QUEUE_CONSISTENT_HASH:
+ strategy = new AllocateMessageQueueConsistentHash();
+ break;
+ case
AllocateMessageQueueStrategyConstants.ALLOCATE_MESSAGE_QUEUE_STICKY:
+ strategy = new AllocateMessageQueueSticky(new
HashMap<String, List<MessageQueue>>());
+ default:
+ break;
+ }
+ }
+
this.brokerController.getAllocateMessageQueueStrategyTable().put(consumerGroup,
strategy);
+ }
Review comment:
Updated. A synchronized keyword had been applied to avoid race
conditions.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]