This is an automated email from the ASF dual-hosted git repository.

dongeforever pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new 43730b7  [ROCKETMQ-96]Rename some temp variable and field closes 
apache/rocketmq#60
43730b7 is described below

commit 43730b783384514ec18d2abf0010303988bfc373
Author: Jaskey <[email protected]>
AuthorDate: Wed Dec 13 19:54:51 2017 +0800

    [ROCKETMQ-96]Rename some temp variable and field closes apache/rocketmq#60
---
 .../client/impl/consumer/ProcessQueue.java         | 31 ++++++++++++----------
 .../impl/producer/DefaultMQProducerImpl.java       |  6 ++---
 2 files changed, 20 insertions(+), 17 deletions(-)

diff --git 
a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ProcessQueue.java
 
b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ProcessQueue.java
index 4176520..0cea1ae 100644
--- 
a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ProcessQueue.java
+++ 
b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ProcessQueue.java
@@ -48,7 +48,10 @@ public class ProcessQueue {
     private final AtomicLong msgCount = new AtomicLong();
     private final AtomicLong msgSize = new AtomicLong();
     private final Lock lockConsume = new ReentrantLock();
-    private final TreeMap<Long, MessageExt> msgTreeMapTemp = new TreeMap<Long, 
MessageExt>();
+    /**
+     * A subset of msgTreeMap, will only be used when orderly consume
+     */
+    private final TreeMap<Long, MessageExt> consumingMsgOrderlyTreeMap = new 
TreeMap<Long, MessageExt>();
     private final AtomicLong tryUnlockTimes = new AtomicLong(0);
     private volatile long queueOffsetMax = 0L;
     private volatile boolean dropped = false;
@@ -243,8 +246,8 @@ public class ProcessQueue {
         try {
             this.lockTreeMap.writeLock().lockInterruptibly();
             try {
-                this.msgTreeMap.putAll(this.msgTreeMapTemp);
-                this.msgTreeMapTemp.clear();
+                this.msgTreeMap.putAll(this.consumingMsgOrderlyTreeMap);
+                this.consumingMsgOrderlyTreeMap.clear();
             } finally {
                 this.lockTreeMap.writeLock().unlock();
             }
@@ -257,12 +260,12 @@ public class ProcessQueue {
         try {
             this.lockTreeMap.writeLock().lockInterruptibly();
             try {
-                Long offset = this.msgTreeMapTemp.lastKey();
-                msgCount.addAndGet(0 - this.msgTreeMapTemp.size());
-                for (MessageExt msg : this.msgTreeMapTemp.values()) {
+                Long offset = this.consumingMsgOrderlyTreeMap.lastKey();
+                msgCount.addAndGet(0 - this.consumingMsgOrderlyTreeMap.size());
+                for (MessageExt msg : 
this.consumingMsgOrderlyTreeMap.values()) {
                     msgSize.addAndGet(0 - msg.getBody().length);
                 }
-                this.msgTreeMapTemp.clear();
+                this.consumingMsgOrderlyTreeMap.clear();
                 if (offset != null) {
                     return offset + 1;
                 }
@@ -281,7 +284,7 @@ public class ProcessQueue {
             this.lockTreeMap.writeLock().lockInterruptibly();
             try {
                 for (MessageExt msg : msgs) {
-                    this.msgTreeMapTemp.remove(msg.getQueueOffset());
+                    
this.consumingMsgOrderlyTreeMap.remove(msg.getQueueOffset());
                     this.msgTreeMap.put(msg.getQueueOffset(), msg);
                 }
             } finally {
@@ -304,7 +307,7 @@ public class ProcessQueue {
                         Map.Entry<Long, MessageExt> entry = 
this.msgTreeMap.pollFirstEntry();
                         if (entry != null) {
                             result.add(entry.getValue());
-                            msgTreeMapTemp.put(entry.getKey(), 
entry.getValue());
+                            consumingMsgOrderlyTreeMap.put(entry.getKey(), 
entry.getValue());
                         } else {
                             break;
                         }
@@ -343,7 +346,7 @@ public class ProcessQueue {
             this.lockTreeMap.writeLock().lockInterruptibly();
             try {
                 this.msgTreeMap.clear();
-                this.msgTreeMapTemp.clear();
+                this.consumingMsgOrderlyTreeMap.clear();
                 this.msgCount.set(0);
                 this.msgSize.set(0);
                 this.queueOffsetMax = 0L;
@@ -402,10 +405,10 @@ public class ProcessQueue {
                 info.setCachedMsgSizeInMiB((int) (this.msgSize.get() / (1024 * 
1024)));
             }
 
-            if (!this.msgTreeMapTemp.isEmpty()) {
-                
info.setTransactionMsgMinOffset(this.msgTreeMapTemp.firstKey());
-                info.setTransactionMsgMaxOffset(this.msgTreeMapTemp.lastKey());
-                info.setTransactionMsgCount(this.msgTreeMapTemp.size());
+            if (!this.consumingMsgOrderlyTreeMap.isEmpty()) {
+                
info.setTransactionMsgMinOffset(this.consumingMsgOrderlyTreeMap.firstKey());
+                
info.setTransactionMsgMaxOffset(this.consumingMsgOrderlyTreeMap.lastKey());
+                
info.setTransactionMsgCount(this.consumingMsgOrderlyTreeMap.size());
             }
 
             info.setLocked(this.locked);
diff --git 
a/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java
 
b/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java
index 35b905e..7c16979 100644
--- 
a/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java
+++ 
b/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java
@@ -454,9 +454,9 @@ public class DefaultMQProducerImpl implements 
MQProducerInner {
             String[] brokersSent = new String[timesTotal];
             for (; times < timesTotal; times++) {
                 String lastBrokerName = null == mq ? null : mq.getBrokerName();
-                MessageQueue tmpmq = 
this.selectOneMessageQueue(topicPublishInfo, lastBrokerName);
-                if (tmpmq != null) {
-                    mq = tmpmq;
+                MessageQueue mqSelected = 
this.selectOneMessageQueue(topicPublishInfo, lastBrokerName);
+                if (mqSelected != null) {
+                    mq = mqSelected;
                     brokersSent[times] = mq.getBrokerName();
                     try {
                         beginTimestampPrev = System.currentTimeMillis();

-- 
To stop receiving notification emails like this one, please contact
['"[email protected]" <[email protected]>'].

Reply via email to