henrypoter edited a comment on issue #2127:
URL: https://github.com/apache/rocketmq/issues/2127#issuecomment-653790013


   > > > > litePullConsumer
   > > > 
   > > > 
   > > > litePullConsumer.start() should after 
litePullConsumer.setMessageQueueListener(mql)???
   > > 
   > > 
   > > Both not work.
   > > Below is the workaround:
   > > `consumer.subscribe(topic, "*"); consumer.start(); consumer.poll(); 
consumer.setMessageQueueListener(mql);`
   > 
   > the default MessageQueueListenerImpl like this,can you find some difference
   > 
   > ```
   > 
org.apache.rocketmq.client.impl.consumer.DefaultLitePullConsumerImpl.MessageQueueListenerImpl
   > 
   >     class MessageQueueListenerImpl implements MessageQueueListener {
   >         @Override
   >         public void messageQueueChanged(String topic, Set<MessageQueue> 
mqAll, Set<MessageQueue> mqDivided) {
   >             MessageModel messageModel = 
defaultLitePullConsumer.getMessageModel();
   >             switch (messageModel) {
   >                 case BROADCASTING:
   >                     updateAssignedMessageQueue(topic, mqAll);
   >                     updatePullTask(topic, mqAll);
   >                     break;
   >                 case CLUSTERING:
   >                     updateAssignedMessageQueue(topic, mqDivided);
   >                     updatePullTask(topic, mqDivided);
   >                     break;
   >                 default:
   >                     break;
   >             }
   >         }
   >     }
   > ```
   
   I custom the MessageQueueListener to monitor the consumer rebalance and 
reset offset of queues by myself.
   From the client code, the consumer will use the default 
MessageQueueListenerImpl when subscribe.
   ```
    public synchronized void subscribe(String topic, String subExpression) 
throws MQClientException {
           try {
               if (topic == null || topic.equals("")) {
                   throw new IllegalArgumentException("Topic can not be null or 
empty.");
               }
               setSubscriptionType(SubscriptionType.SUBSCRIBE);
               SubscriptionData subscriptionData = 
FilterAPI.buildSubscriptionData(defaultLitePullConsumer.getConsumerGroup(),
                   topic, subExpression);
               this.rebalanceImpl.getSubscriptionInner().put(topic, 
subscriptionData);
               this.defaultLitePullConsumer.setMessageQueueListener(new 
MessageQueueListenerImpl());
               assignedMessageQueue.setRebalanceImpl(this.rebalanceImpl);
               if (serviceState == ServiceState.RUNNING) {
                   this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();
                   updateTopicSubscribeInfoWhenSubscriptionChanged();
               }
           } catch (Exception e) {
               throw new MQClientException("subscribe exception", e);
           }
       }
   ```
   
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to