This is an automated email from the ASF dual-hosted git repository.

duhengforever pushed a commit to branch litePullConsumer
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/litePullConsumer by this push:
     new 62ca947  Polish LitePullConsumer (#1332)
62ca947 is described below

commit 62ca947a457769469189e6e0acd990a833b70ab8
Author: King <[email protected]>
AuthorDate: Fri Jul 19 11:19:07 2019 +0800

    Polish LitePullConsumer (#1332)
    
    * fix unsubscribe code
    
    * fix commit consumed offset
    
    * fix commit consumed offset
    
    * fix commit consumed offset
    
    * fix commit consumed offset
    
    * polish commit consumed offset
    
    * pass checkstyle
    
    * pass checkstyle
---
 .../client/consumer/DefaultLiteMQPullConsumer.java |  6 +-
 .../client/consumer/DefaultMQPullConsumer.java     |  3 +-
 .../rocketmq/client/consumer/MQPullConsumer.java   |  2 -
 .../client/impl/consumer/AssignedMessageQueue.java | 10 ----
 .../impl/consumer/LiteMQPullConsumerImpl.java      | 65 ++++++++++++++--------
 .../client/impl/consumer/ProcessQueue.java         | 12 ++++
 .../example/simple/LitePullConsumerTest.java       | 14 ++---
 7 files changed, 64 insertions(+), 48 deletions(-)

diff --git 
a/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultLiteMQPullConsumer.java
 
b/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultLiteMQPullConsumer.java
index 96d4f5a..6f67bcf 100644
--- 
a/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultLiteMQPullConsumer.java
+++ 
b/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultLiteMQPullConsumer.java
@@ -42,7 +42,7 @@ public class DefaultLiteMQPullConsumer extends 
DefaultMQPullConsumer implements
     /**
      * Maximum commit offset interval time in seconds.
      */
-    private long autoCommitInterval = 20;
+    private long autoCommitInterval = 5;
 
     public DefaultLiteMQPullConsumer(String consumerGroup, RPCHook rpcHook) {
         this.setConsumerGroup(consumerGroup);
@@ -55,7 +55,7 @@ public class DefaultLiteMQPullConsumer extends 
DefaultMQPullConsumer implements
     }
 
     @Override
-    public void start() throws MQClientException{
+    public void start() throws MQClientException {
         this.liteMQPullConsumer.start();
     }
 
@@ -95,7 +95,7 @@ public class DefaultLiteMQPullConsumer extends 
DefaultMQPullConsumer implements
 
     @Override
     public void commitSync() {
-        this.liteMQPullConsumer.commit();
+        this.liteMQPullConsumer.commitSync();
     }
 
     public long getConsumeTimeout() {
diff --git 
a/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPullConsumer.java
 
b/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPullConsumer.java
index dbf37d2..3fa3af2 100644
--- 
a/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPullConsumer.java
+++ 
b/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPullConsumer.java
@@ -16,9 +16,7 @@
  */
 package org.apache.rocketmq.client.consumer;
 
-import java.util.Collection;
 import java.util.HashSet;
-import java.util.List;
 import java.util.Set;
 import org.apache.rocketmq.client.ClientConfig;
 import org.apache.rocketmq.client.QueryResult;
@@ -40,6 +38,7 @@ import 
org.apache.rocketmq.remoting.exception.RemotingException;
  * Default pulling consumer
  */
 public class DefaultMQPullConsumer extends ClientConfig implements 
MQPullConsumer {
+
     protected final transient DefaultMQPullConsumerImpl 
defaultMQPullConsumerImpl;
 
     /**
diff --git 
a/client/src/main/java/org/apache/rocketmq/client/consumer/MQPullConsumer.java 
b/client/src/main/java/org/apache/rocketmq/client/consumer/MQPullConsumer.java
index 9c7cb36..a8e9628 100644
--- 
a/client/src/main/java/org/apache/rocketmq/client/consumer/MQPullConsumer.java
+++ 
b/client/src/main/java/org/apache/rocketmq/client/consumer/MQPullConsumer.java
@@ -16,8 +16,6 @@
  */
 package org.apache.rocketmq.client.consumer;
 
-import java.util.Collection;
-import java.util.List;
 import java.util.Set;
 import org.apache.rocketmq.client.exception.MQBrokerException;
 import org.apache.rocketmq.client.exception.MQClientException;
diff --git 
a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/AssignedMessageQueue.java
 
b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/AssignedMessageQueue.java
index e9623a8..fb0ca79 100644
--- 
a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/AssignedMessageQueue.java
+++ 
b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/AssignedMessageQueue.java
@@ -17,7 +17,6 @@
 package org.apache.rocketmq.client.impl.consumer;
 
 import java.util.Collection;
-import java.util.HashMap;
 import java.util.Iterator;
 import java.util.Map;
 import java.util.Set;
@@ -107,15 +106,6 @@ public class AssignedMessageQueue {
         }
     }
 
-    public Map<MessageQueue, Long> getNeedCommitOffsets() {
-        Map<MessageQueue, Long> map = new HashMap<MessageQueue, Long>();
-        Set<Map.Entry<MessageQueue, MessageQueueStat>> entries = 
this.assignedMessageQueueState.entrySet();
-        for (Map.Entry<MessageQueue, MessageQueueStat> entry : entries) {
-            map.put(entry.getKey(), entry.getValue().getNextOffset());
-        }
-        return map;
-    }
-
     public class MessageQueueStat {
         private MessageQueue messageQueue;
         private boolean paused = false;
diff --git 
a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/LiteMQPullConsumerImpl.java
 
b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/LiteMQPullConsumerImpl.java
index d612286..ab229e4 100644
--- 
a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/LiteMQPullConsumerImpl.java
+++ 
b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/LiteMQPullConsumerImpl.java
@@ -16,9 +16,9 @@
  */
 package org.apache.rocketmq.client.impl.consumer;
 
-import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
@@ -33,6 +33,7 @@ import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ScheduledThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.locks.ReadWriteLock;
+
 import org.apache.commons.lang3.reflect.FieldUtils;
 import org.apache.rocketmq.client.consumer.DefaultLiteMQPullConsumer;
 import org.apache.rocketmq.client.consumer.MessageQueueListener;
@@ -50,6 +51,7 @@ import org.apache.rocketmq.logging.InternalLogger;
 import org.apache.rocketmq.remoting.RPCHook;
 
 public class LiteMQPullConsumerImpl extends DefaultMQPullConsumerImpl {
+
     private final InternalLogger log = ClientLogger.getLog();
 
     private DefaultLiteMQPullConsumer defaultLiteMQPullConsumer;
@@ -59,7 +61,7 @@ public class LiteMQPullConsumerImpl extends 
DefaultMQPullConsumerImpl {
 
     private AssignedMessageQueue assignedMessageQueue = new 
AssignedMessageQueue();
 
-    private List<ConsumeRequest> allConsumed = new 
ArrayList<ConsumeRequest>(256);
+    private volatile Set<ConsumeRequest> consumedSet = new 
HashSet<ConsumeRequest>();
 
     private final BlockingQueue<ConsumeRequest> consumeRequestCache = new 
LinkedBlockingQueue<ConsumeRequest>();
 
@@ -69,6 +71,8 @@ public class LiteMQPullConsumerImpl extends 
DefaultMQPullConsumerImpl {
 
     private ScheduledExecutorService autoCommitExecutors;
 
+    private final ThreadLocal<ConsumeRequest> preConsumeRequestLocal = new 
ThreadLocal<ConsumeRequest>();
+
     public LiteMQPullConsumerImpl(final DefaultLiteMQPullConsumer 
defaultMQPullConsumer, final RPCHook rpcHook) {
         super(defaultMQPullConsumer, rpcHook);
         this.defaultLiteMQPullConsumer = defaultMQPullConsumer;
@@ -145,7 +149,7 @@ public class LiteMQPullConsumerImpl extends 
DefaultMQPullConsumerImpl {
             @Override
             public void run() {
                 if (defaultLiteMQPullConsumer.isAutoCommit()) {
-                    commit();
+                    commitAll();
                 }
             }
         }, this.defaultLiteMQPullConsumer.getAutoCommitInterval(), 
this.defaultLiteMQPullConsumer.getAutoCommitInterval(), TimeUnit.SECONDS);
@@ -164,7 +168,9 @@ public class LiteMQPullConsumerImpl extends 
DefaultMQPullConsumerImpl {
 
     public List<MessageExt> poll(long timeout) {
         try {
-            ConsumeRequest consumeRequest = consumeRequestCache.poll(timeout, 
TimeUnit.SECONDS);
+            addToConsumed(preConsumeRequestLocal.get());
+            ConsumeRequest consumeRequest = consumeRequestCache.poll(timeout, 
TimeUnit.MILLISECONDS);
+            preConsumeRequestLocal.set(consumeRequest);
             if (consumeRequest != null) {
                 List<MessageExt> messages = consumeRequest.getMessageExts();
                 for (MessageExt messageExt : messages) {
@@ -173,7 +179,8 @@ public class LiteMQPullConsumerImpl extends 
DefaultMQPullConsumerImpl {
                 
consumeRequest.setStartConsumeTimeMillis(System.currentTimeMillis());
                 return messages;
             }
-        } catch (InterruptedException ignore) {
+        } catch (InterruptedException e) {
+            log.error("poll ComsumeRequest error.", e);
         }
         return null;
     }
@@ -197,7 +204,7 @@ public class LiteMQPullConsumerImpl extends 
DefaultMQPullConsumerImpl {
     }
 
     public void unsubscribe(final String topic) {
-        unsubscribe(topic);
+        super.unsubscribe(topic);
         removePullTaskCallback(topic);
         assignedMessageQueue.removeAssignedMessageQueue(topic);
     }
@@ -212,30 +219,39 @@ public class LiteMQPullConsumerImpl extends 
DefaultMQPullConsumerImpl {
             while (it.hasNext()) {
                 Map.Entry<MessageQueue, PullTaskImpl> next = it.next();
                 if (next.getKey().getTopic().equals(topic)) {
+                    next.getValue().setCancelled(true);
                     it.remove();
                 }
             }
         }
     }
 
-    public void commit() {
-        List<ConsumeRequest> consumeRequests;
-        synchronized (this.allConsumed) {
-            consumeRequests = this.allConsumed;
-            this.allConsumed = new ArrayList<ConsumeRequest>();
+    public void commitSync() {
+        addToConsumed(preConsumeRequestLocal.get());
+        preConsumeRequestLocal.set(null);
+        commitAll();
+    }
+
+    public void commitAll() {
+        Set<ConsumeRequest> consumedRequests;
+        synchronized (this.consumedSet) {
+            consumedRequests = this.consumedSet;
+            this.consumedSet = new HashSet<ConsumeRequest>();
         }
-        for (ConsumeRequest consumeRequest : consumeRequests) {
+        for (ConsumeRequest consumeRequest : consumedRequests) {
             
consumeRequest.getProcessQueue().removeMessage(consumeRequest.messageExts);
         }
-        Set<Map.Entry<MessageQueue, Long>> entrySet = 
assignedMessageQueue.getNeedCommitOffsets().entrySet();
-        for (Map.Entry<MessageQueue, Long> entry : entrySet) {
+        Set<Map.Entry<MessageQueue, ProcessQueue>> entrySet = 
this.rebalanceImpl.getProcessQueueTable().entrySet();
+        for (Map.Entry<MessageQueue, ProcessQueue> entry : entrySet) {
             try {
-                updateConsumeOffset(entry.getKey(), entry.getValue());
+                long consumeOffset = entry.getValue().getConsumeOffset();
+                if (consumeOffset != -1)
+                    updateConsumeOffset(entry.getKey(), consumeOffset);
             } catch (MQClientException e) {
                 log.error("A error occurred in update consume offset 
process.", e);
             }
         }
-        
this.getOffsetStore().persistAll(assignedMessageQueue.getNeedCommitOffsets().keySet());
+        
this.getOffsetStore().persistAll(this.rebalanceImpl.getProcessQueueTable().keySet());
     }
 
     private void commit(final MessageQueue messageQueue, final ProcessQueue 
processQueue, final MessageExt messageExt) {
@@ -260,7 +276,7 @@ public class LiteMQPullConsumerImpl extends 
DefaultMQPullConsumerImpl {
         }
     }
 
-    void updatePullOffset(MessageQueue remoteQueue, long nextPullOffset) {
+    private void updatePullOffset(MessageQueue remoteQueue, long 
nextPullOffset) {
         try {
             assignedMessageQueue.updateNextOffset(remoteQueue, nextPullOffset);
         } catch (MQClientException e) {
@@ -269,21 +285,23 @@ public class LiteMQPullConsumerImpl extends 
DefaultMQPullConsumerImpl {
     }
 
     private void addToConsumed(ConsumeRequest consumeRequest) {
-        synchronized (this.allConsumed) {
-            allConsumed.add(consumeRequest);
+        if (consumeRequest != null) {
+            synchronized (this.consumedSet) {
+                if (!consumedSet.contains(consumeRequest))
+                    consumedSet.add(consumeRequest);
+            }
         }
     }
 
-    void submitConsumeRequest(ConsumeRequest consumeRequest) {
+    private void submitConsumeRequest(ConsumeRequest consumeRequest) {
         try {
             consumeRequestCache.put(consumeRequest);
-            addToConsumed(consumeRequest);
         } catch (InterruptedException ex) {
             log.error("Submit consumeRequest error", ex);
         }
     }
 
-    long nextPullOffset(MessageQueue remoteQueue) {
+    private long nextPullOffset(MessageQueue remoteQueue) {
         long offset = -1;
         try {
             offset = assignedMessageQueue.getNextOffset(remoteQueue);
@@ -337,7 +355,7 @@ public class LiteMQPullConsumerImpl extends 
DefaultMQPullConsumerImpl {
                     this.defaultMQPullConsumer.sendMessageBack(msg, 3);
                     log.info("Send expired msg back. topic={}, msgId={}, 
storeHost={}, queueId={}, queueOffset={}",
                         msg.getTopic(), msg.getMsgId(), msg.getStoreHost(), 
msg.getQueueId(), msg.getQueueOffset());
-                    System.out.println("Send expired msg back.");
+                    log.info("Send expired msg back.");
                     commit(mq, pq, msg);
                 } catch (Exception e) {
                     log.error("Send back expired msg exception", e);
@@ -364,7 +382,6 @@ public class LiteMQPullConsumerImpl extends 
DefaultMQPullConsumerImpl {
 
         @Override
         public void run() {
-            System.out.println("begin pull message");
             String topic = this.messageQueue.getTopic();
             if (!this.isCancelled()) {
                 if (assignedMessageQueue.isPaused(messageQueue)) {
diff --git 
a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ProcessQueue.java
 
b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ProcessQueue.java
index 0a52817..e9a1c72 100644
--- 
a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ProcessQueue.java
+++ 
b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ProcessQueue.java
@@ -26,6 +26,7 @@ import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReadWriteLock;
 import java.util.concurrent.locks.ReentrantLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
+
 import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
 import org.apache.rocketmq.client.log.ClientLogger;
 import org.apache.rocketmq.logging.InternalLogger;
@@ -431,4 +432,15 @@ public class ProcessQueue {
     public void setLastConsumeTimestamp(long lastConsumeTimestamp) {
         this.lastConsumeTimestamp = lastConsumeTimestamp;
     }
+
+    public long getConsumeOffset() {
+
+        if (msgTreeMap.isEmpty() && queueOffsetMax == 0L)
+            return -1;
+
+        if (!msgTreeMap.isEmpty())
+            return msgTreeMap.firstKey();
+        else
+            return queueOffsetMax + 1;
+    }
 }
diff --git 
a/example/src/main/java/org/apache/rocketmq/example/simple/LitePullConsumerTest.java
 
b/example/src/main/java/org/apache/rocketmq/example/simple/LitePullConsumerTest.java
index 4297e4f..215763b 100644
--- 
a/example/src/main/java/org/apache/rocketmq/example/simple/LitePullConsumerTest.java
+++ 
b/example/src/main/java/org/apache/rocketmq/example/simple/LitePullConsumerTest.java
@@ -25,24 +25,24 @@ import org.apache.rocketmq.common.message.MessageQueue;
 public class LitePullConsumerTest {
     public static void main(String[] args) throws Exception {
         DefaultLiteMQPullConsumer litePullConsumer = new 
DefaultLiteMQPullConsumer("test", null);
-        litePullConsumer.subscribe("test", null);
+        litePullConsumer.setNamesrvAddr("localhost:9876");
+        litePullConsumer.subscribe("litepullconsumertest9", null);
         litePullConsumer.start();
-        MessageQueue messageQueue = new MessageQueue("test", 
"duhengdeMacBook-Pro.local", 1);
+        MessageQueue messageQueue = new MessageQueue("test", 
"IT-C02YW28FLVDL.local", 1);
         int i = 0;
         while (true) {
             List<MessageExt> messageExts = litePullConsumer.poll();
-            System.out.println("-----------");
-            System.out.println(messageExts);
-            System.out.println("-----------");
+            System.out.printf("%s%n", messageExts);
             i++;
             if (i == 3) {
-                System.out.println("pause");
+                System.out.printf("pause%n");
                 litePullConsumer.pause(Arrays.asList(messageQueue));
             }
             if (i == 10) {
-                System.out.println("resume");
+                System.out.printf("resume%n");
                 litePullConsumer.resume(Arrays.asList(messageQueue));
             }
+//
             litePullConsumer.commitSync();
         }
     }

Reply via email to