This is an automated email from the ASF dual-hosted git repository.

duhengforever pushed a commit to branch litePullConsumer
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/litePullConsumer by this push:
     new 46288ab  Polish lite pull consumer and fix bug (#1373)
46288ab is described below

commit 46288abb33e7f4cf9ca59dbfbff12be96fd8494a
Author: King <[email protected]>
AuthorDate: Fri Aug 9 11:26:53 2019 +0800

    Polish lite pull consumer and fix bug (#1373)
    
    * fix unsubscribe code
    
    * fix commit consumed offset
    
    * fix commit consumed offset
    
    * fix commit consumed offset
    
    * fix commit consumed offset
    
    * polish commit consumed offset
    
    * pass checkstyle
    
    * pass checkstyle
    
    * polish LiteMQPullConsumer
    
    * add flow control and polish commit logic
    
    * fix bug
    
    * polish code
    
    * fix commit consumed offset back
    
    * refactor litePullConsumer
    
    * development save
    
    * development save
    
    * Refactor DefaultLitePullConsumer and DefaultLitePullConsumerImpl.
    
    * Polish lite pull consumer
    
    * polish lite pull consumer
    
    * polish lite pull consumer
    
    * fix seek
    
    * fix seek function
    
    * polish lite pull consumer
    
    * add apache header
    
    * add test
    
    * polish test
    
    * Make broadcast model work for litePullConsumer
    
    * Revert example/broadcast/PushConsumer.java
    
    * Add delay time when no new message
    
    * Enable long polling mode
    
    * Fix subscribe bug when rebalance
    
    * Delete useless consumeMessageHook
---
 .../client/consumer/DefaultLitePullConsumer.java   |  15 +-
 .../client/impl/consumer/AssignedMessageQueue.java |  44 ++++--
 .../impl/consumer/DefaultLitePullConsumerImpl.java | 159 ++++++---------------
 .../rocketmq/example/broadcast/PushConsumer.java   |   2 +-
 4 files changed, 80 insertions(+), 140 deletions(-)

diff --git 
a/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultLitePullConsumer.java
 
b/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultLitePullConsumer.java
index 7f65713..1858fa1 100644
--- 
a/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultLitePullConsumer.java
+++ 
b/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultLitePullConsumer.java
@@ -17,9 +17,7 @@
 package org.apache.rocketmq.client.consumer;
 
 import java.util.Collection;
-import java.util.HashSet;
 import java.util.List;
-import java.util.Set;
 
 import org.apache.rocketmq.client.ClientConfig;
 import 
org.apache.rocketmq.client.consumer.rebalance.AllocateMessageQueueAveragely;
@@ -69,10 +67,7 @@ public class DefaultLitePullConsumer extends ClientConfig 
implements LitePullCon
      * Offset Storage
      */
     private OffsetStore offsetStore;
-    /**
-     * Topic set you want to register
-     */
-    private Set<String> registerTopics = new HashSet<String>();
+
     /**
      * Queue allocation algorithm
      */
@@ -372,14 +367,6 @@ public class DefaultLitePullConsumer extends ClientConfig 
implements LitePullCon
         this.messageQueueListener = messageQueueListener;
     }
 
-    public Set<String> getRegisterTopics() {
-        return registerTopics;
-    }
-
-    public void setRegisterTopics(Set<String> registerTopics) {
-        this.registerTopics = withNamespace(registerTopics);
-    }
-
     public long getConsumerPullTimeoutMillis() {
         return consumerPullTimeoutMillis;
     }
diff --git 
a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/AssignedMessageQueue.java
 
b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/AssignedMessageQueue.java
index aa8379e..b21fd01 100644
--- 
a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/AssignedMessageQueue.java
+++ 
b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/AssignedMessageQueue.java
@@ -19,6 +19,7 @@ package org.apache.rocketmq.client.impl.consumer;
 import java.util.Collection;
 import java.util.Iterator;
 import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import org.apache.rocketmq.common.CountDownLatch2;
 import org.apache.rocketmq.common.message.MessageQueue;
@@ -37,7 +38,7 @@ public class AssignedMessageQueue {
         this.rebalanceImpl = rebalanceImpl;
     }
 
-    public Collection<MessageQueue> messageQueues() {
+    public Set<MessageQueue> messageQueues() {
         return assignedMessageQueueState.keySet();
     }
 
@@ -130,6 +131,23 @@ public class AssignedMessageQueue {
         return null;
     }
 
+    public void updateAssignedMessageQueue(String topic, 
Collection<MessageQueue> assigned) {
+        synchronized (this.assignedMessageQueueState) {
+            Iterator<Map.Entry<MessageQueue, MessageQueueStat>> it = 
this.assignedMessageQueueState.entrySet().iterator();
+            while (it.hasNext()) {
+                Map.Entry<MessageQueue, MessageQueueStat> next = it.next();
+                if (next.getKey().getTopic().equals(topic)) {
+                    if (!assigned.contains(next.getKey())) {
+                        System.out.printf("MessageQueue-%s is removed %n", 
next.getKey());
+                        next.getValue().getProcessQueue().setDropped(true);
+                        it.remove();
+                    }
+                }
+            }
+            addAssignedMessageQueue(assigned);
+        }
+    }
+
     public void updateAssignedMessageQueue(Collection<MessageQueue> assigned) {
         synchronized (this.assignedMessageQueueState) {
             Iterator<Map.Entry<MessageQueue, MessageQueueStat>> it = 
this.assignedMessageQueueState.entrySet().iterator();
@@ -140,18 +158,22 @@ public class AssignedMessageQueue {
                     it.remove();
                 }
             }
+            addAssignedMessageQueue(assigned);
+        }
+    }
 
-            for (MessageQueue messageQueue : assigned) {
-                if (!this.assignedMessageQueueState.containsKey(messageQueue)) 
{
-                    MessageQueueStat messageQueueStat;
-                    if (rebalanceImpl != null && 
rebalanceImpl.processQueueTable.get(messageQueue) != null) {
-                        messageQueueStat = new MessageQueueStat(messageQueue, 
rebalanceImpl.processQueueTable.get(messageQueue));
-                    } else {
-                        ProcessQueue processQueue = new ProcessQueue();
-                        messageQueueStat = new MessageQueueStat(messageQueue, 
processQueue);
-                    }
-                    this.assignedMessageQueueState.put(messageQueue, 
messageQueueStat);
+    private void addAssignedMessageQueue(Collection<MessageQueue> assigned) {
+        for (MessageQueue messageQueue : assigned) {
+            if (!this.assignedMessageQueueState.containsKey(messageQueue)) {
+                MessageQueueStat messageQueueStat;
+                if (rebalanceImpl != null && 
rebalanceImpl.getProcessQueueTable().get(messageQueue) != null) {
+                    System.out.printf("MessageQueue-%s is added %n", 
messageQueue);
+                    messageQueueStat = new MessageQueueStat(messageQueue, 
rebalanceImpl.getProcessQueueTable().get(messageQueue));
+                } else {
+                    ProcessQueue processQueue = new ProcessQueue();
+                    messageQueueStat = new MessageQueueStat(messageQueue, 
processQueue);
                 }
+                this.assignedMessageQueueState.put(messageQueue, 
messageQueueStat);
             }
         }
     }
diff --git 
a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultLitePullConsumerImpl.java
 
b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultLitePullConsumerImpl.java
index 74cf644..07ef1cf 100644
--- 
a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultLitePullConsumerImpl.java
+++ 
b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultLitePullConsumerImpl.java
@@ -37,15 +37,12 @@ import 
org.apache.rocketmq.client.consumer.DefaultLitePullConsumer;
 import org.apache.rocketmq.client.consumer.MessageSelector;
 import org.apache.rocketmq.client.consumer.MessageQueueListener;
 import org.apache.rocketmq.client.consumer.PullResult;
-import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
 import org.apache.rocketmq.client.consumer.store.LocalFileOffsetStore;
 import org.apache.rocketmq.client.consumer.store.OffsetStore;
 import org.apache.rocketmq.client.consumer.store.ReadOffsetType;
 import org.apache.rocketmq.client.consumer.store.RemoteBrokerOffsetStore;
 import org.apache.rocketmq.client.exception.MQBrokerException;
 import org.apache.rocketmq.client.exception.MQClientException;
-import org.apache.rocketmq.client.hook.ConsumeMessageContext;
-import org.apache.rocketmq.client.hook.ConsumeMessageHook;
 import org.apache.rocketmq.client.hook.FilterMessageHook;
 import org.apache.rocketmq.client.impl.CommunicationMode;
 import org.apache.rocketmq.client.impl.MQClientManager;
@@ -55,15 +52,10 @@ import org.apache.rocketmq.common.CountDownLatch2;
 import org.apache.rocketmq.common.MixAll;
 import org.apache.rocketmq.common.ServiceState;
 import org.apache.rocketmq.common.ThreadFactoryImpl;
-import org.apache.rocketmq.common.UtilAll;
 import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
 import org.apache.rocketmq.common.filter.ExpressionType;
 import org.apache.rocketmq.common.filter.FilterAPI;
 import org.apache.rocketmq.common.help.FAQUrl;
-
-import org.apache.rocketmq.common.message.Message;
-import org.apache.rocketmq.common.message.MessageAccessor;
-import org.apache.rocketmq.common.message.MessageConst;
 import org.apache.rocketmq.common.message.MessageExt;
 import org.apache.rocketmq.common.message.MessageQueue;
 import org.apache.rocketmq.common.protocol.NamespaceUtil;
@@ -74,7 +66,6 @@ import 
org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData;
 import org.apache.rocketmq.common.sysflag.PullSysFlag;
 import org.apache.rocketmq.logging.InternalLogger;
 import org.apache.rocketmq.remoting.RPCHook;
-import org.apache.rocketmq.remoting.common.RemotingHelper;
 import org.apache.rocketmq.remoting.exception.RemotingException;
 
 public class DefaultLitePullConsumerImpl implements MQConsumerInner {
@@ -85,8 +76,6 @@ public class DefaultLitePullConsumerImpl implements 
MQConsumerInner {
 
     private final RPCHook rpcHook;
 
-    private final ArrayList<ConsumeMessageHook> consumeMessageHookList = new 
ArrayList<ConsumeMessageHook>();
-
     private final ArrayList<FilterMessageHook> filterMessageHookList = new 
ArrayList<FilterMessageHook>();
 
     private volatile ServiceState serviceState = ServiceState.CREATE_JUST;
@@ -122,6 +111,10 @@ public class DefaultLitePullConsumerImpl implements 
MQConsumerInner {
      * Delay some time when suspend pull service
      */
     private static final long PULL_TIME_DELAY_MILLS_WHEN_PAUSE = 1000;
+    /**
+     * Delay some time when no new message
+     */
+    private static final long PULL_TIME_DELAY_MILLS_WHEN_NO_NEW_MSG = 0;
 
     private DefaultLitePullConsumer defaultLitePullConsumer;
 
@@ -143,10 +136,8 @@ public class DefaultLitePullConsumerImpl implements 
MQConsumerInner {
     private long nextAutoCommitDeadline = -1L;
 
     public DefaultLitePullConsumerImpl(final DefaultLitePullConsumer 
defaultLitePullConsumer, final RPCHook rpcHook) {
-
         this.defaultLitePullConsumer = defaultLitePullConsumer;
         this.rpcHook = rpcHook;
-
     }
 
     private void checkServiceState() {
@@ -162,8 +153,7 @@ public class DefaultLitePullConsumerImpl implements 
MQConsumerInner {
     }
 
     private void updateAssignedMessageQueue(String topic, Set<MessageQueue> 
assignedMessageQueue) {
-        
this.assignedMessageQueue.updateAssignedMessageQueue(assignedMessageQueue);
-        updatePullTask(topic, assignedMessageQueue);
+        this.assignedMessageQueue.updateAssignedMessageQueue(topic, 
assignedMessageQueue);
     }
 
     private void updatePullTask(String topic, Set<MessageQueue> mqNewSet) {
@@ -187,9 +177,11 @@ public class DefaultLitePullConsumerImpl implements 
MQConsumerInner {
             switch (messageModel) {
                 case BROADCASTING:
                     updateAssignedMessageQueue(topic, mqAll);
+                    updatePullTask(topic, mqAll);
                     break;
                 case CLUSTERING:
                     updateAssignedMessageQueue(topic, mqDivided);
+                    updatePullTask(topic, mqDivided);
                     break;
                 default:
                     break;
@@ -356,13 +348,16 @@ public class DefaultLitePullConsumerImpl implements 
MQConsumerInner {
 
     private void copySubscription() throws MQClientException {
         try {
-            Set<String> registerTopics = 
this.defaultLitePullConsumer.getRegisterTopics();
-            if (registerTopics != null) {
-                for (final String topic : registerTopics) {
-                    SubscriptionData subscriptionData = 
FilterAPI.buildSubscriptionData(this.defaultLitePullConsumer.getConsumerGroup(),
-                        topic, SubscriptionData.SUB_ALL);
-                    this.rebalanceImpl.getSubscriptionInner().put(topic, 
subscriptionData);
-                }
+            switch (this.defaultLitePullConsumer.getMessageModel()) {
+                case BROADCASTING:
+                    break;
+                case CLUSTERING:
+                    /*
+                     * Retry topic support in the future.
+                     */
+                    break;
+                default:
+                    break;
             }
         } catch (Exception e) {
             throw new MQClientException("subscription exception", e);
@@ -421,7 +416,6 @@ public class DefaultLitePullConsumerImpl implements 
MQConsumerInner {
 
     public synchronized void unsubscribe(final String topic) {
         this.rebalanceImpl.getSubscriptionInner().remove(topic);
-        //can be delete
         removePullTaskCallback(topic);
         assignedMessageQueue.removeAssignedMessageQueue(topic);
     }
@@ -484,8 +478,13 @@ public class DefaultLitePullConsumerImpl implements 
MQConsumerInner {
     }
 
     public synchronized void seek(MessageQueue messageQueue, long offset) 
throws MQClientException {
-        if (!assignedMessageQueue.messageQueues().contains(messageQueue))
-            throw new MQClientException("The message queue is not in assigned 
list, message queue: " + messageQueue, null);
+        if (!assignedMessageQueue.messageQueues().contains(messageQueue)) {
+            if (subscriptionType == SubscriptionType.SUBSCRIBE) {
+                throw new MQClientException("The message queue is not in 
assigned list, may be rebalancing, message queue: " + messageQueue, null);
+            } else {
+                throw new MQClientException("The message queue is not in 
assigned list, message queue: " + messageQueue, null);
+            }
+        }
         long minOffset = minOffset(messageQueue);
         long maxOffset = maxOffset(messageQueue);
         if (offset < minOffset || offset > maxOffset)
@@ -552,6 +551,8 @@ public class DefaultLitePullConsumerImpl implements 
MQConsumerInner {
                     }
                 }
             }
+            if (defaultLitePullConsumer.getMessageModel() == 
MessageModel.BROADCASTING)
+                offsetStore.persistAll(assignedMessageQueue.messageQueues());
         } catch (Exception e) {
             log.error("An error occurred when update consume offset 
synchronously.", e);
         }
@@ -570,6 +571,8 @@ public class DefaultLitePullConsumerImpl implements 
MQConsumerInner {
                     }
                 }
             }
+            if (defaultLitePullConsumer.getMessageModel() == 
MessageModel.BROADCASTING)
+                offsetStore.persistAll(assignedMessageQueue.messageQueues());
         } catch (Exception e) {
             log.error("An error occurred when update consume offset 
Automatically.");
         }
@@ -605,6 +608,9 @@ public class DefaultLitePullConsumerImpl implements 
MQConsumerInner {
             offset = assignedMessageQueue.getPullOffset(remoteQueue);
             if (offset == -1) {
                 offset = fetchConsumeOffset(remoteQueue, false);
+                if (offset == -1 && defaultLitePullConsumer.getMessageModel() 
== MessageModel.BROADCASTING) {
+                    offset = 0;
+                }
                 assignedMessageQueue.updatePullOffset(remoteQueue, offset);
                 assignedMessageQueue.updateConsumeOffset(remoteQueue, offset);
             }
@@ -706,6 +712,9 @@ public class DefaultLitePullConsumerImpl implements 
MQConsumerInner {
                         case OFFSET_ILLEGAL:
                             log.warn("the pull request offset illegal, {}", 
pullResult.toString());
                             break;
+                        case NO_NEW_MSG:
+                            pullDelayTimeMills = 
PULL_TIME_DELAY_MILLS_WHEN_NO_NEW_MSG;
+                            break;
                         default:
                             break;
                     }
@@ -745,7 +754,7 @@ public class DefaultLitePullConsumerImpl implements 
MQConsumerInner {
     private PullResult pull(MessageQueue mq, String subExpression, long 
offset, int maxNums, long timeout)
         throws MQClientException, RemotingException, MQBrokerException, 
InterruptedException {
         SubscriptionData subscriptionData = getSubscriptionData(mq, 
subExpression);
-        return this.pullSyncImpl(mq, subscriptionData, offset, maxNums, false, 
timeout);
+        return this.pullSyncImpl(mq, subscriptionData, offset, maxNums, true, 
timeout);
     }
 
     private PullResult pull(MessageQueue mq, MessageSelector messageSelector, 
long offset, int maxNums)
@@ -756,7 +765,7 @@ public class DefaultLitePullConsumerImpl implements 
MQConsumerInner {
     private PullResult pull(MessageQueue mq, MessageSelector messageSelector, 
long offset, int maxNums, long timeout)
         throws MQClientException, RemotingException, MQBrokerException, 
InterruptedException {
         SubscriptionData subscriptionData = getSubscriptionData(mq, 
messageSelector);
-        return this.pullSyncImpl(mq, subscriptionData, offset, maxNums, false, 
timeout);
+        return this.pullSyncImpl(mq, subscriptionData, offset, maxNums, true, 
timeout);
     }
 
     private SubscriptionData getSubscriptionData(MessageQueue mq, String 
subExpression)
@@ -830,43 +839,9 @@ public class DefaultLitePullConsumerImpl implements 
MQConsumerInner {
         this.pullAPIWrapper.processPullResult(mq, pullResult, 
subscriptionData);
         //If namespace not null , reset Topic without namespace.
         this.resetTopic(pullResult.getMsgFoundList());
-        if (!this.consumeMessageHookList.isEmpty()) {
-            ConsumeMessageContext consumeMessageContext = null;
-            consumeMessageContext = new ConsumeMessageContext();
-            
consumeMessageContext.setNamespace(defaultLitePullConsumer.getNamespace());
-            consumeMessageContext.setConsumerGroup(this.groupName());
-            consumeMessageContext.setMq(mq);
-            consumeMessageContext.setMsgList(pullResult.getMsgFoundList());
-            consumeMessageContext.setSuccess(false);
-            this.executeHookBefore(consumeMessageContext);
-            
consumeMessageContext.setStatus(ConsumeConcurrentlyStatus.CONSUME_SUCCESS.toString());
-            consumeMessageContext.setSuccess(true);
-            this.executeHookAfter(consumeMessageContext);
-        }
         return pullResult;
     }
 
-    private void executeHookBefore(final ConsumeMessageContext context) {
-        if (!this.consumeMessageHookList.isEmpty()) {
-            for (ConsumeMessageHook hook : this.consumeMessageHookList) {
-                try {
-                    hook.consumeMessageBefore(context);
-                } catch (Throwable ignored) {
-                }
-            }
-        }
-    }
-
-    private void executeHookAfter(final ConsumeMessageContext context) {
-        if (!this.consumeMessageHookList.isEmpty()) {
-            for (ConsumeMessageHook hook : this.consumeMessageHookList) {
-                try {
-                    hook.consumeMessageAfter(context);
-                } catch (Throwable ignored) {
-                }
-            }
-        }
-    }
 
     public void resetTopic(List<MessageExt> msgList) {
         if (null == msgList || msgList.size() == 0) {
@@ -920,25 +895,11 @@ public class DefaultLitePullConsumerImpl implements 
MQConsumerInner {
 
     @Override
     public Set<SubscriptionData> subscriptions() {
-        Set<SubscriptionData> result = new HashSet<SubscriptionData>();
-
-        Set<String> topics = this.defaultLitePullConsumer.getRegisterTopics();
-        if (topics != null) {
-            synchronized (topics) {
-                for (String t : topics) {
-                    SubscriptionData ms = null;
-                    try {
-                        ms = FilterAPI.buildSubscriptionData(this.groupName(), 
t, SubscriptionData.SUB_ALL);
-                    } catch (Exception e) {
-                        log.error("parse subscription error", e);
-                    }
-                    ms.setSubVersion(0L);
-                    result.add(ms);
-                }
-            }
-        }
+        Set<SubscriptionData> subSet = new HashSet<SubscriptionData>();
 
-        return result;
+        subSet.addAll(this.rebalanceImpl.getSubscriptionInner().values());
+
+        return subSet;
     }
 
     @Override
@@ -1000,41 +961,6 @@ public class DefaultLitePullConsumerImpl implements 
MQConsumerInner {
         return info;
     }
 
-    private void sendMessageBack(MessageExt msg, int delayLevel, final String 
brokerName)
-        throws RemotingException, MQBrokerException, InterruptedException, 
MQClientException {
-        sendMessageBack(msg, delayLevel, brokerName, 
this.defaultLitePullConsumer.getConsumerGroup());
-    }
-
-    private void sendMessageBack(MessageExt msg, int delayLevel, final String 
brokerName, String consumerGroup)
-        throws RemotingException, MQBrokerException, InterruptedException, 
MQClientException {
-        try {
-            String brokerAddr = (null != brokerName) ? 
this.mQClientFactory.findBrokerAddressInPublish(brokerName)
-                : RemotingHelper.parseSocketAddressAddr(msg.getStoreHost());
-
-            if (UtilAll.isBlank(consumerGroup)) {
-                consumerGroup = 
this.defaultLitePullConsumer.getConsumerGroup();
-            }
-
-            
this.mQClientFactory.getMQClientAPIImpl().consumerSendMessageBack(brokerAddr, 
msg, consumerGroup, delayLevel, 3000,
-                this.defaultLitePullConsumer.getMaxReconsumeTimes());
-        } catch (Exception e) {
-            log.error("sendMessageBack Exception, " + 
this.defaultLitePullConsumer.getConsumerGroup(), e);
-
-            Message newMsg = new 
Message(MixAll.getRetryTopic(this.defaultLitePullConsumer.getConsumerGroup()), 
msg.getBody());
-            String originMsgId = MessageAccessor.getOriginMessageId(msg);
-            MessageAccessor.setOriginMessageId(newMsg, 
UtilAll.isBlank(originMsgId) ? msg.getMsgId() : originMsgId);
-            newMsg.setFlag(msg.getFlag());
-            MessageAccessor.setProperties(newMsg, msg.getProperties());
-            MessageAccessor.putProperty(newMsg, 
MessageConst.PROPERTY_RETRY_TOPIC, msg.getTopic());
-            MessageAccessor.setReconsumeTime(newMsg, 
String.valueOf(msg.getReconsumeTimes() + 1));
-            MessageAccessor.setMaxReconsumeTimes(newMsg, 
String.valueOf(this.defaultLitePullConsumer.getMaxReconsumeTimes()));
-            newMsg.setDelayTimeLevel(3 + msg.getReconsumeTimes());
-            this.mQClientFactory.getDefaultMQProducer().send(newMsg);
-        } finally {
-            msg.setTopic(NamespaceUtil.withoutNamespace(msg.getTopic(), 
this.defaultLitePullConsumer.getNamespace()));
-        }
-    }
-
     private void updateConsumeOffsetToBroker(MessageQueue mq, long offset, 
boolean isOneway) throws RemotingException,
         MQBrokerException, InterruptedException, MQClientException {
         this.offsetStore.updateConsumeOffsetToBroker(mq, offset, isOneway);
@@ -1044,6 +970,11 @@ public class DefaultLitePullConsumerImpl implements 
MQConsumerInner {
         return offsetStore;
     }
 
+    public void registerFilterMessageHook(final FilterMessageHook hook) {
+        this.filterMessageHookList.add(hook);
+        log.info("register FilterMessageHook Hook, {}", hook.hookName());
+    }
+
     public DefaultLitePullConsumer getDefaultLitePullConsumer() {
         return defaultLitePullConsumer;
     }
diff --git 
a/example/src/main/java/org/apache/rocketmq/example/broadcast/PushConsumer.java 
b/example/src/main/java/org/apache/rocketmq/example/broadcast/PushConsumer.java
index fb1f9bb..28e0234 100644
--- 
a/example/src/main/java/org/apache/rocketmq/example/broadcast/PushConsumer.java
+++ 
b/example/src/main/java/org/apache/rocketmq/example/broadcast/PushConsumer.java
@@ -50,4 +50,4 @@ public class PushConsumer {
         consumer.start();
         System.out.printf("Broadcast Consumer Started.%n");
     }
-}
+}
\ No newline at end of file

Reply via email to