henrypoter edited a comment on issue #2127:
URL: https://github.com/apache/rocketmq/issues/2127#issuecomment-655238278


   > > Testcase to reproduce the problem
   > > 
[LitePullConsumerSubscribe3.java.txt](https://github.com/apache/rocketmq/files/4856365/LitePullConsumerSubscribe3.java.txt)
   > > When we call litePullConsumer.setMessageQueueListener(mql) the consumer 
cannot poll( ) messages from broker.
   > 
   > Thanks for your details about this issue, and I read your code listed 
above, Rocketmq's subscribe() mode does not support custom 
MessageQueueListener, because in subscribe mode, pull consumer will 
automatically do load balancing. If you change this, it will not work. If you 
don't want rocketmq to manage load balancing, assign() mode will be a better 
choice.
   > 
   > But this setMessageQueueListener() is really best to be hidden from users, 
are you willing to submit a PR to optimize it?
   
   I think the litePullConsumer should provide the ability to listen  queue 
rebalance event.
   
   I suggest to keep the messageQueueListener  in litePullConsumer. And let the 
original messageQueueListener  be a inner default messageQueueListenerInner ,  
the RebalanceLitePullImpl call the messageQueueListenerInner (It will never be 
overrided by developer). Then the messageQueueListenerInner  call 
messageQueueListener seted by developer via 
litePullConsumer.setMessageQueueListener()  
   
   Sth like this: 
   ```
   //DefaultLitePullConsumerImpl.java
   private MessageQueueListener messageQueueListenerInner = new 
MessageQueueListenerImpl();
   class MessageQueueListenerImpl implements MessageQueueListener {
   
           @Override
           public void messageQueueChanged(String topic, Set<MessageQueue> 
mqAll, Set<MessageQueue> mqDivided) {
               MessageModel messageModel = 
defaultLitePullConsumer.getMessageModel();
               MessageQueueListener messageQueueListener = 
defaultLitePullConsumer.getMessageQueueListener();
               switch (messageModel) {
                   case BROADCASTING:
                       updateAssignedMessageQueue(topic, mqAll);
                       updatePullTask(topic, mqAll);
                       if(null != messageQueueListener){
                           try {
                               
messageQueueListener.messageQueueChanged(topic,mqAll,mqDivided);
                           } catch (Throwable e) {
                               log.error("messageQueueChanged exception", e);
                           }
                       }
                       break;
                   case CLUSTERING:
                       updateAssignedMessageQueue(topic, mqDivided);
                       updatePullTask(topic, mqDivided);
                       if(null != messageQueueListener){
                           try {
                               
messageQueueListener.messageQueueChanged(topic,mqAll,mqDivided);
                           } catch (Throwable e) {
                               log.error("messageQueueChanged exception", e);
                           }
                       }
                       break;
                   default:
                       break;
               }
           }
       }
   
   
   
   //RebalanceLitePullImpl.java
   @Override
       public void messageQueueChanged(String topic, Set<MessageQueue> mqAll, 
Set<MessageQueue> mqDivided) {
           MessageQueueListener messageQueueListener = 
this.litePullConsumerImpl./*getDefaultLitePullConsumer().*/getMessageQueueListenerInner();
           if (messageQueueListener != null) {
               try {
                   messageQueueListener.messageQueueChanged(topic, mqAll, 
mqDivided);
               } catch (Throwable e) {
                   log.error("messageQueueChanged exception", e);
               }
           }
       }
   
   ```
   
   
   
   
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to