This is an automated email from the ASF dual-hosted git repository.
dongeforever pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new 43730b7 [ROCKETMQ-96]Rename some temp variable and field closes
apache/rocketmq#60
43730b7 is described below
commit 43730b783384514ec18d2abf0010303988bfc373
Author: Jaskey <[email protected]>
AuthorDate: Wed Dec 13 19:54:51 2017 +0800
[ROCKETMQ-96]Rename some temp variable and field closes apache/rocketmq#60
---
.../client/impl/consumer/ProcessQueue.java | 31 ++++++++++++----------
.../impl/producer/DefaultMQProducerImpl.java | 6 ++---
2 files changed, 20 insertions(+), 17 deletions(-)
diff --git
a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ProcessQueue.java
b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ProcessQueue.java
index 4176520..0cea1ae 100644
---
a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ProcessQueue.java
+++
b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ProcessQueue.java
@@ -48,7 +48,10 @@ public class ProcessQueue {
private final AtomicLong msgCount = new AtomicLong();
private final AtomicLong msgSize = new AtomicLong();
private final Lock lockConsume = new ReentrantLock();
- private final TreeMap<Long, MessageExt> msgTreeMapTemp = new TreeMap<Long,
MessageExt>();
+ /**
+ * A subset of msgTreeMap, will only be used when orderly consume
+ */
+ private final TreeMap<Long, MessageExt> consumingMsgOrderlyTreeMap = new
TreeMap<Long, MessageExt>();
private final AtomicLong tryUnlockTimes = new AtomicLong(0);
private volatile long queueOffsetMax = 0L;
private volatile boolean dropped = false;
@@ -243,8 +246,8 @@ public class ProcessQueue {
try {
this.lockTreeMap.writeLock().lockInterruptibly();
try {
- this.msgTreeMap.putAll(this.msgTreeMapTemp);
- this.msgTreeMapTemp.clear();
+ this.msgTreeMap.putAll(this.consumingMsgOrderlyTreeMap);
+ this.consumingMsgOrderlyTreeMap.clear();
} finally {
this.lockTreeMap.writeLock().unlock();
}
@@ -257,12 +260,12 @@ public class ProcessQueue {
try {
this.lockTreeMap.writeLock().lockInterruptibly();
try {
- Long offset = this.msgTreeMapTemp.lastKey();
- msgCount.addAndGet(0 - this.msgTreeMapTemp.size());
- for (MessageExt msg : this.msgTreeMapTemp.values()) {
+ Long offset = this.consumingMsgOrderlyTreeMap.lastKey();
+ msgCount.addAndGet(0 - this.consumingMsgOrderlyTreeMap.size());
+ for (MessageExt msg :
this.consumingMsgOrderlyTreeMap.values()) {
msgSize.addAndGet(0 - msg.getBody().length);
}
- this.msgTreeMapTemp.clear();
+ this.consumingMsgOrderlyTreeMap.clear();
if (offset != null) {
return offset + 1;
}
@@ -281,7 +284,7 @@ public class ProcessQueue {
this.lockTreeMap.writeLock().lockInterruptibly();
try {
for (MessageExt msg : msgs) {
- this.msgTreeMapTemp.remove(msg.getQueueOffset());
+
this.consumingMsgOrderlyTreeMap.remove(msg.getQueueOffset());
this.msgTreeMap.put(msg.getQueueOffset(), msg);
}
} finally {
@@ -304,7 +307,7 @@ public class ProcessQueue {
Map.Entry<Long, MessageExt> entry =
this.msgTreeMap.pollFirstEntry();
if (entry != null) {
result.add(entry.getValue());
- msgTreeMapTemp.put(entry.getKey(),
entry.getValue());
+ consumingMsgOrderlyTreeMap.put(entry.getKey(),
entry.getValue());
} else {
break;
}
@@ -343,7 +346,7 @@ public class ProcessQueue {
this.lockTreeMap.writeLock().lockInterruptibly();
try {
this.msgTreeMap.clear();
- this.msgTreeMapTemp.clear();
+ this.consumingMsgOrderlyTreeMap.clear();
this.msgCount.set(0);
this.msgSize.set(0);
this.queueOffsetMax = 0L;
@@ -402,10 +405,10 @@ public class ProcessQueue {
info.setCachedMsgSizeInMiB((int) (this.msgSize.get() / (1024 *
1024)));
}
- if (!this.msgTreeMapTemp.isEmpty()) {
-
info.setTransactionMsgMinOffset(this.msgTreeMapTemp.firstKey());
- info.setTransactionMsgMaxOffset(this.msgTreeMapTemp.lastKey());
- info.setTransactionMsgCount(this.msgTreeMapTemp.size());
+ if (!this.consumingMsgOrderlyTreeMap.isEmpty()) {
+
info.setTransactionMsgMinOffset(this.consumingMsgOrderlyTreeMap.firstKey());
+
info.setTransactionMsgMaxOffset(this.consumingMsgOrderlyTreeMap.lastKey());
+
info.setTransactionMsgCount(this.consumingMsgOrderlyTreeMap.size());
}
info.setLocked(this.locked);
diff --git
a/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java
b/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java
index 35b905e..7c16979 100644
---
a/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java
+++
b/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java
@@ -454,9 +454,9 @@ public class DefaultMQProducerImpl implements
MQProducerInner {
String[] brokersSent = new String[timesTotal];
for (; times < timesTotal; times++) {
String lastBrokerName = null == mq ? null : mq.getBrokerName();
- MessageQueue tmpmq =
this.selectOneMessageQueue(topicPublishInfo, lastBrokerName);
- if (tmpmq != null) {
- mq = tmpmq;
+ MessageQueue mqSelected =
this.selectOneMessageQueue(topicPublishInfo, lastBrokerName);
+ if (mqSelected != null) {
+ mq = mqSelected;
brokersSent[times] = mq.getBrokerName();
try {
beginTimestampPrev = System.currentTimeMillis();
--
To stop receiving notification emails like this one, please contact
['"[email protected]" <[email protected]>'].