henrypoter edited a comment on issue #2127: URL: https://github.com/apache/rocketmq/issues/2127#issuecomment-655238278
> > Testcase to reproduce the problem > > [LitePullConsumerSubscribe3.java.txt](https://github.com/apache/rocketmq/files/4856365/LitePullConsumerSubscribe3.java.txt) > > When we call litePullConsumer.setMessageQueueListener(mql) the consumer cannot poll( ) messages from broker. > > Thanks for your details about this issue, and I read your code listed above, Rocketmq's subscribe() mode does not support custom MessageQueueListener, because in subscribe mode, pull consumer will automatically do load balancing. If you change this, it will not work. If you don't want rocketmq to manage load balancing, assign() mode will be a better choice. > > But this setMessageQueueListener() is really best to be hidden from users, are you willing to submit a PR to optimize it? I think the litePullConsumer should provide the ability to listen queue rebalance event. I suggest to keep the messageQueueListener in litePullConsumer. And let the original messageQueueListener be a inner default messageQueueListenerInner , the RebalanceLitePullImpl call the messageQueueListenerInner (It will never be overrided by developer). Then the messageQueueListenerInner call messageQueueListener seted by developer via litePullConsumer.setMessageQueueListener() Sth like this: ``` //DefaultLitePullConsumerImpl.java private MessageQueueListener messageQueueListenerInner = new MessageQueueListenerImpl(); class MessageQueueListenerImpl implements MessageQueueListener { @Override public void messageQueueChanged(String topic, Set<MessageQueue> mqAll, Set<MessageQueue> mqDivided) { MessageModel messageModel = defaultLitePullConsumer.getMessageModel(); MessageQueueListener messageQueueListener = defaultLitePullConsumer.getMessageQueueListener(); switch (messageModel) { case BROADCASTING: updateAssignedMessageQueue(topic, mqAll); updatePullTask(topic, mqAll); if(null != messageQueueListener){ try { messageQueueListener.messageQueueChanged(topic,mqAll,mqDivided); } catch (Throwable e) { log.error("messageQueueChanged exception", e); } } break; case CLUSTERING: updateAssignedMessageQueue(topic, mqDivided); updatePullTask(topic, mqDivided); if(null != messageQueueListener){ try { messageQueueListener.messageQueueChanged(topic,mqAll,mqDivided); } catch (Throwable e) { log.error("messageQueueChanged exception", e); } } break; default: break; } } } //RebalanceLitePullImpl.java @Override public void messageQueueChanged(String topic, Set<MessageQueue> mqAll, Set<MessageQueue> mqDivided) { MessageQueueListener messageQueueListener = this.litePullConsumerImpl./*getDefaultLitePullConsumer().*/getMessageQueueListenerInner(); if (messageQueueListener != null) { try { messageQueueListener.messageQueueChanged(topic, mqAll, mqDivided); } catch (Throwable e) { log.error("messageQueueChanged exception", e); } } } ``` ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [email protected]
