jack870131 commented on a change in pull request #2169:
URL: https://github.com/apache/rocketmq/pull/2169#discussion_r470074684



##########
File path: 
broker/src/main/java/org/apache/rocketmq/broker/processor/ConsumerManageProcessor.java
##########
@@ -152,4 +168,75 @@ private RemotingCommand 
queryConsumerOffset(ChannelHandlerContext ctx, RemotingC
 
         return response;
     }
+
+    private RemotingCommand allocateMessageQueue(ChannelHandlerContext ctx, 
RemotingCommand request)
+        throws RemotingCommandException {
+        final RemotingCommand response =
+            
RemotingCommand.createResponseCommand(AllocateMessageQueueResponseHeader.class);
+        final AllocateMessageQueueRequestHeader requestHeader =
+            (AllocateMessageQueueRequestHeader) 
request.decodeCommandCustomHeader(AllocateMessageQueueRequestHeader.class);
+        final AllocateMessageQueueRequestBody requestBody = 
AllocateMessageQueueRequestBody.decode(request.getBody(),
+            AllocateMessageQueueRequestBody.class);
+
+        AllocateMessageQueueStrategy strategy = null;
+        String consumerGroup = requestHeader.getConsumerGroup();
+        String strategyName = requestHeader.getStrategyName();
+
+        if 
(this.brokerController.getAllocateMessageQueueStrategyTable().containsKey(consumerGroup))
 {
+            strategy = 
this.brokerController.getAllocateMessageQueueStrategyTable().get(consumerGroup);
+        } else {
+            if 
(strategyName.startsWith(AllocateMessageQueueStrategyConstants.ALLOCATE_MACHINE_ROOM_NEARBY))
 {
+                
response.setCode(ResponseCode.ALLOCATE_MESSAGE_QUEUE_STRATEGY_NOT_SUPPORTED);
+                response.setRemark("The broker does not support message queue 
strategy " + strategyName);
+                return response;
+            } else {
+                switch (strategyName) {
+                    case 
AllocateMessageQueueStrategyConstants.ALLOCATE_MESSAGE_QUEUE_AVERAGELY:
+                        strategy = new AllocateMessageQueueAveragely();
+                        break;
+                    case 
AllocateMessageQueueStrategyConstants.ALLOCATE_MESSAGE_QUEUE_AVERAGELY_BY_CIRCLE:
+                        strategy = new AllocateMessageQueueAveragelyByCircle();
+                        break;
+                    case 
AllocateMessageQueueStrategyConstants.ALLOCATE_MESSAGE_QUEUE_BY_CONFIG:
+                        strategy = new AllocateMessageQueueByConfig();
+                        break;
+                    case 
AllocateMessageQueueStrategyConstants.ALLOCATE_MESSAGE_QUEUE_BY_MACHINE_ROOM:
+                        strategy = new AllocateMessageQueueByMachineRoom();
+                        break;
+                    case 
AllocateMessageQueueStrategyConstants.ALLOCATE_MESSAGE_QUEUE_CONSISTENT_HASH:
+                        strategy = new AllocateMessageQueueConsistentHash();
+                        break;
+                    case 
AllocateMessageQueueStrategyConstants.ALLOCATE_MESSAGE_QUEUE_STICKY:
+                        strategy = new AllocateMessageQueueSticky(new 
HashMap<String, List<MessageQueue>>());
+                    default:
+                        break;
+                }
+            }
+            
this.brokerController.getAllocateMessageQueueStrategyTable().put(consumerGroup, 
strategy);
+        }

Review comment:
       Updated. A synchronized keyword had been applied to avoid race 
conditions.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to