This is an automated email from the ASF dual-hosted git repository.
duhengforever pushed a commit to branch litePullConsumer
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/litePullConsumer by this push:
new 62ca947 Polish LitePullConsumer (#1332)
62ca947 is described below
commit 62ca947a457769469189e6e0acd990a833b70ab8
Author: King <[email protected]>
AuthorDate: Fri Jul 19 11:19:07 2019 +0800
Polish LitePullConsumer (#1332)
* fix unsubscribe code
* fix commit consumed offset
* fix commit consumed offset
* fix commit consumed offset
* fix commit consumed offset
* polish commit consumed offset
* pass checkstyle
* pass checkstyle
---
.../client/consumer/DefaultLiteMQPullConsumer.java | 6 +-
.../client/consumer/DefaultMQPullConsumer.java | 3 +-
.../rocketmq/client/consumer/MQPullConsumer.java | 2 -
.../client/impl/consumer/AssignedMessageQueue.java | 10 ----
.../impl/consumer/LiteMQPullConsumerImpl.java | 65 ++++++++++++++--------
.../client/impl/consumer/ProcessQueue.java | 12 ++++
.../example/simple/LitePullConsumerTest.java | 14 ++---
7 files changed, 64 insertions(+), 48 deletions(-)
diff --git
a/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultLiteMQPullConsumer.java
b/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultLiteMQPullConsumer.java
index 96d4f5a..6f67bcf 100644
---
a/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultLiteMQPullConsumer.java
+++
b/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultLiteMQPullConsumer.java
@@ -42,7 +42,7 @@ public class DefaultLiteMQPullConsumer extends
DefaultMQPullConsumer implements
/**
* Maximum commit offset interval time in seconds.
*/
- private long autoCommitInterval = 20;
+ private long autoCommitInterval = 5;
public DefaultLiteMQPullConsumer(String consumerGroup, RPCHook rpcHook) {
this.setConsumerGroup(consumerGroup);
@@ -55,7 +55,7 @@ public class DefaultLiteMQPullConsumer extends
DefaultMQPullConsumer implements
}
@Override
- public void start() throws MQClientException{
+ public void start() throws MQClientException {
this.liteMQPullConsumer.start();
}
@@ -95,7 +95,7 @@ public class DefaultLiteMQPullConsumer extends
DefaultMQPullConsumer implements
@Override
public void commitSync() {
- this.liteMQPullConsumer.commit();
+ this.liteMQPullConsumer.commitSync();
}
public long getConsumeTimeout() {
diff --git
a/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPullConsumer.java
b/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPullConsumer.java
index dbf37d2..3fa3af2 100644
---
a/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPullConsumer.java
+++
b/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPullConsumer.java
@@ -16,9 +16,7 @@
*/
package org.apache.rocketmq.client.consumer;
-import java.util.Collection;
import java.util.HashSet;
-import java.util.List;
import java.util.Set;
import org.apache.rocketmq.client.ClientConfig;
import org.apache.rocketmq.client.QueryResult;
@@ -40,6 +38,7 @@ import
org.apache.rocketmq.remoting.exception.RemotingException;
* Default pulling consumer
*/
public class DefaultMQPullConsumer extends ClientConfig implements
MQPullConsumer {
+
protected final transient DefaultMQPullConsumerImpl
defaultMQPullConsumerImpl;
/**
diff --git
a/client/src/main/java/org/apache/rocketmq/client/consumer/MQPullConsumer.java
b/client/src/main/java/org/apache/rocketmq/client/consumer/MQPullConsumer.java
index 9c7cb36..a8e9628 100644
---
a/client/src/main/java/org/apache/rocketmq/client/consumer/MQPullConsumer.java
+++
b/client/src/main/java/org/apache/rocketmq/client/consumer/MQPullConsumer.java
@@ -16,8 +16,6 @@
*/
package org.apache.rocketmq.client.consumer;
-import java.util.Collection;
-import java.util.List;
import java.util.Set;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
diff --git
a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/AssignedMessageQueue.java
b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/AssignedMessageQueue.java
index e9623a8..fb0ca79 100644
---
a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/AssignedMessageQueue.java
+++
b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/AssignedMessageQueue.java
@@ -17,7 +17,6 @@
package org.apache.rocketmq.client.impl.consumer;
import java.util.Collection;
-import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
@@ -107,15 +106,6 @@ public class AssignedMessageQueue {
}
}
- public Map<MessageQueue, Long> getNeedCommitOffsets() {
- Map<MessageQueue, Long> map = new HashMap<MessageQueue, Long>();
- Set<Map.Entry<MessageQueue, MessageQueueStat>> entries =
this.assignedMessageQueueState.entrySet();
- for (Map.Entry<MessageQueue, MessageQueueStat> entry : entries) {
- map.put(entry.getKey(), entry.getValue().getNextOffset());
- }
- return map;
- }
-
public class MessageQueueStat {
private MessageQueue messageQueue;
private boolean paused = false;
diff --git
a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/LiteMQPullConsumerImpl.java
b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/LiteMQPullConsumerImpl.java
index d612286..ab229e4 100644
---
a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/LiteMQPullConsumerImpl.java
+++
b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/LiteMQPullConsumerImpl.java
@@ -16,9 +16,9 @@
*/
package org.apache.rocketmq.client.impl.consumer;
-import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
+import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
@@ -33,6 +33,7 @@ import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReadWriteLock;
+
import org.apache.commons.lang3.reflect.FieldUtils;
import org.apache.rocketmq.client.consumer.DefaultLiteMQPullConsumer;
import org.apache.rocketmq.client.consumer.MessageQueueListener;
@@ -50,6 +51,7 @@ import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.remoting.RPCHook;
public class LiteMQPullConsumerImpl extends DefaultMQPullConsumerImpl {
+
private final InternalLogger log = ClientLogger.getLog();
private DefaultLiteMQPullConsumer defaultLiteMQPullConsumer;
@@ -59,7 +61,7 @@ public class LiteMQPullConsumerImpl extends
DefaultMQPullConsumerImpl {
private AssignedMessageQueue assignedMessageQueue = new
AssignedMessageQueue();
- private List<ConsumeRequest> allConsumed = new
ArrayList<ConsumeRequest>(256);
+ private volatile Set<ConsumeRequest> consumedSet = new
HashSet<ConsumeRequest>();
private final BlockingQueue<ConsumeRequest> consumeRequestCache = new
LinkedBlockingQueue<ConsumeRequest>();
@@ -69,6 +71,8 @@ public class LiteMQPullConsumerImpl extends
DefaultMQPullConsumerImpl {
private ScheduledExecutorService autoCommitExecutors;
+ private final ThreadLocal<ConsumeRequest> preConsumeRequestLocal = new
ThreadLocal<ConsumeRequest>();
+
public LiteMQPullConsumerImpl(final DefaultLiteMQPullConsumer
defaultMQPullConsumer, final RPCHook rpcHook) {
super(defaultMQPullConsumer, rpcHook);
this.defaultLiteMQPullConsumer = defaultMQPullConsumer;
@@ -145,7 +149,7 @@ public class LiteMQPullConsumerImpl extends
DefaultMQPullConsumerImpl {
@Override
public void run() {
if (defaultLiteMQPullConsumer.isAutoCommit()) {
- commit();
+ commitAll();
}
}
}, this.defaultLiteMQPullConsumer.getAutoCommitInterval(),
this.defaultLiteMQPullConsumer.getAutoCommitInterval(), TimeUnit.SECONDS);
@@ -164,7 +168,9 @@ public class LiteMQPullConsumerImpl extends
DefaultMQPullConsumerImpl {
public List<MessageExt> poll(long timeout) {
try {
- ConsumeRequest consumeRequest = consumeRequestCache.poll(timeout,
TimeUnit.SECONDS);
+ addToConsumed(preConsumeRequestLocal.get());
+ ConsumeRequest consumeRequest = consumeRequestCache.poll(timeout,
TimeUnit.MILLISECONDS);
+ preConsumeRequestLocal.set(consumeRequest);
if (consumeRequest != null) {
List<MessageExt> messages = consumeRequest.getMessageExts();
for (MessageExt messageExt : messages) {
@@ -173,7 +179,8 @@ public class LiteMQPullConsumerImpl extends
DefaultMQPullConsumerImpl {
consumeRequest.setStartConsumeTimeMillis(System.currentTimeMillis());
return messages;
}
- } catch (InterruptedException ignore) {
+ } catch (InterruptedException e) {
+ log.error("poll ComsumeRequest error.", e);
}
return null;
}
@@ -197,7 +204,7 @@ public class LiteMQPullConsumerImpl extends
DefaultMQPullConsumerImpl {
}
public void unsubscribe(final String topic) {
- unsubscribe(topic);
+ super.unsubscribe(topic);
removePullTaskCallback(topic);
assignedMessageQueue.removeAssignedMessageQueue(topic);
}
@@ -212,30 +219,39 @@ public class LiteMQPullConsumerImpl extends
DefaultMQPullConsumerImpl {
while (it.hasNext()) {
Map.Entry<MessageQueue, PullTaskImpl> next = it.next();
if (next.getKey().getTopic().equals(topic)) {
+ next.getValue().setCancelled(true);
it.remove();
}
}
}
}
- public void commit() {
- List<ConsumeRequest> consumeRequests;
- synchronized (this.allConsumed) {
- consumeRequests = this.allConsumed;
- this.allConsumed = new ArrayList<ConsumeRequest>();
+ public void commitSync() {
+ addToConsumed(preConsumeRequestLocal.get());
+ preConsumeRequestLocal.set(null);
+ commitAll();
+ }
+
+ public void commitAll() {
+ Set<ConsumeRequest> consumedRequests;
+ synchronized (this.consumedSet) {
+ consumedRequests = this.consumedSet;
+ this.consumedSet = new HashSet<ConsumeRequest>();
}
- for (ConsumeRequest consumeRequest : consumeRequests) {
+ for (ConsumeRequest consumeRequest : consumedRequests) {
consumeRequest.getProcessQueue().removeMessage(consumeRequest.messageExts);
}
- Set<Map.Entry<MessageQueue, Long>> entrySet =
assignedMessageQueue.getNeedCommitOffsets().entrySet();
- for (Map.Entry<MessageQueue, Long> entry : entrySet) {
+ Set<Map.Entry<MessageQueue, ProcessQueue>> entrySet =
this.rebalanceImpl.getProcessQueueTable().entrySet();
+ for (Map.Entry<MessageQueue, ProcessQueue> entry : entrySet) {
try {
- updateConsumeOffset(entry.getKey(), entry.getValue());
+ long consumeOffset = entry.getValue().getConsumeOffset();
+ if (consumeOffset != -1)
+ updateConsumeOffset(entry.getKey(), consumeOffset);
} catch (MQClientException e) {
log.error("A error occurred in update consume offset
process.", e);
}
}
-
this.getOffsetStore().persistAll(assignedMessageQueue.getNeedCommitOffsets().keySet());
+
this.getOffsetStore().persistAll(this.rebalanceImpl.getProcessQueueTable().keySet());
}
private void commit(final MessageQueue messageQueue, final ProcessQueue
processQueue, final MessageExt messageExt) {
@@ -260,7 +276,7 @@ public class LiteMQPullConsumerImpl extends
DefaultMQPullConsumerImpl {
}
}
- void updatePullOffset(MessageQueue remoteQueue, long nextPullOffset) {
+ private void updatePullOffset(MessageQueue remoteQueue, long
nextPullOffset) {
try {
assignedMessageQueue.updateNextOffset(remoteQueue, nextPullOffset);
} catch (MQClientException e) {
@@ -269,21 +285,23 @@ public class LiteMQPullConsumerImpl extends
DefaultMQPullConsumerImpl {
}
private void addToConsumed(ConsumeRequest consumeRequest) {
- synchronized (this.allConsumed) {
- allConsumed.add(consumeRequest);
+ if (consumeRequest != null) {
+ synchronized (this.consumedSet) {
+ if (!consumedSet.contains(consumeRequest))
+ consumedSet.add(consumeRequest);
+ }
}
}
- void submitConsumeRequest(ConsumeRequest consumeRequest) {
+ private void submitConsumeRequest(ConsumeRequest consumeRequest) {
try {
consumeRequestCache.put(consumeRequest);
- addToConsumed(consumeRequest);
} catch (InterruptedException ex) {
log.error("Submit consumeRequest error", ex);
}
}
- long nextPullOffset(MessageQueue remoteQueue) {
+ private long nextPullOffset(MessageQueue remoteQueue) {
long offset = -1;
try {
offset = assignedMessageQueue.getNextOffset(remoteQueue);
@@ -337,7 +355,7 @@ public class LiteMQPullConsumerImpl extends
DefaultMQPullConsumerImpl {
this.defaultMQPullConsumer.sendMessageBack(msg, 3);
log.info("Send expired msg back. topic={}, msgId={},
storeHost={}, queueId={}, queueOffset={}",
msg.getTopic(), msg.getMsgId(), msg.getStoreHost(),
msg.getQueueId(), msg.getQueueOffset());
- System.out.println("Send expired msg back.");
+ log.info("Send expired msg back.");
commit(mq, pq, msg);
} catch (Exception e) {
log.error("Send back expired msg exception", e);
@@ -364,7 +382,6 @@ public class LiteMQPullConsumerImpl extends
DefaultMQPullConsumerImpl {
@Override
public void run() {
- System.out.println("begin pull message");
String topic = this.messageQueue.getTopic();
if (!this.isCancelled()) {
if (assignedMessageQueue.isPaused(messageQueue)) {
diff --git
a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ProcessQueue.java
b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ProcessQueue.java
index 0a52817..e9a1c72 100644
---
a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ProcessQueue.java
+++
b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ProcessQueue.java
@@ -26,6 +26,7 @@ import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
+
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.log.ClientLogger;
import org.apache.rocketmq.logging.InternalLogger;
@@ -431,4 +432,15 @@ public class ProcessQueue {
public void setLastConsumeTimestamp(long lastConsumeTimestamp) {
this.lastConsumeTimestamp = lastConsumeTimestamp;
}
+
+ public long getConsumeOffset() {
+
+ if (msgTreeMap.isEmpty() && queueOffsetMax == 0L)
+ return -1;
+
+ if (!msgTreeMap.isEmpty())
+ return msgTreeMap.firstKey();
+ else
+ return queueOffsetMax + 1;
+ }
}
diff --git
a/example/src/main/java/org/apache/rocketmq/example/simple/LitePullConsumerTest.java
b/example/src/main/java/org/apache/rocketmq/example/simple/LitePullConsumerTest.java
index 4297e4f..215763b 100644
---
a/example/src/main/java/org/apache/rocketmq/example/simple/LitePullConsumerTest.java
+++
b/example/src/main/java/org/apache/rocketmq/example/simple/LitePullConsumerTest.java
@@ -25,24 +25,24 @@ import org.apache.rocketmq.common.message.MessageQueue;
public class LitePullConsumerTest {
public static void main(String[] args) throws Exception {
DefaultLiteMQPullConsumer litePullConsumer = new
DefaultLiteMQPullConsumer("test", null);
- litePullConsumer.subscribe("test", null);
+ litePullConsumer.setNamesrvAddr("localhost:9876");
+ litePullConsumer.subscribe("litepullconsumertest9", null);
litePullConsumer.start();
- MessageQueue messageQueue = new MessageQueue("test",
"duhengdeMacBook-Pro.local", 1);
+ MessageQueue messageQueue = new MessageQueue("test",
"IT-C02YW28FLVDL.local", 1);
int i = 0;
while (true) {
List<MessageExt> messageExts = litePullConsumer.poll();
- System.out.println("-----------");
- System.out.println(messageExts);
- System.out.println("-----------");
+ System.out.printf("%s%n", messageExts);
i++;
if (i == 3) {
- System.out.println("pause");
+ System.out.printf("pause%n");
litePullConsumer.pause(Arrays.asList(messageQueue));
}
if (i == 10) {
- System.out.println("resume");
+ System.out.printf("resume%n");
litePullConsumer.resume(Arrays.asList(messageQueue));
}
+//
litePullConsumer.commitSync();
}
}