This is an automated email from the ASF dual-hosted git repository.

aloyszhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-tubemq.git


The following commit(s) were added to refs/heads/master by this push:
     new cfbca3e  [TUBEMQ-253] tube-consumer fetch-worker cpu used too high 
(#173)
cfbca3e is described below

commit cfbca3e84b6515873e105057aa69c3d2b43caec6
Author: gosonzhang <4675...@qq.com>
AuthorDate: Sun Jun 28 03:29:49 2020 +0000

    [TUBEMQ-253] tube-consumer fetch-worker cpu used too high (#173)
    
    * [TUBEMQ-253] tube-consumer fetch-worker cpu used too high
---
 .../apache/tubemq/client/consumer/RmtDataCache.java  | 20 +++++++++++---------
 1 file changed, 11 insertions(+), 9 deletions(-)

diff --git 
a/tubemq-client/src/main/java/org/apache/tubemq/client/consumer/RmtDataCache.java
 
b/tubemq-client/src/main/java/org/apache/tubemq/client/consumer/RmtDataCache.java
index 1aa704c..94aed53 100644
--- 
a/tubemq-client/src/main/java/org/apache/tubemq/client/consumer/RmtDataCache.java
+++ 
b/tubemq-client/src/main/java/org/apache/tubemq/client/consumer/RmtDataCache.java
@@ -238,7 +238,7 @@ public class RmtDataCache implements Closeable {
             if (!partitionMap.isEmpty()) {
                 break;
             }
-            ThreadUtils.sleep(200);
+            ThreadUtils.sleep(300);
         } while (true);
         if (this.isClosed.get()) {
             return null;
@@ -249,20 +249,22 @@ public class RmtDataCache implements Closeable {
             if (this.isClosed.get()) {
                 return null;
             }
+            int cycleCnt = 0;
             String key = null;
             do {
-                key = indexPartition.poll();
-                if (key != null) {
-                    break;
+                if (!indexPartition.isEmpty()) {
+                    // If there are idle partitions, poll
+                    key = indexPartition.poll();
+                    if (key != null) {
+                        break;
+                    }
                 }
                 if (this.isClosed.get()) {
                     break;
                 }
-                if (!partitionMap.isEmpty()) {
-                    break;
-                }
-                ThreadUtils.sleep(200);
-            } while(true);
+                ThreadUtils.sleep(300);
+                //if no idle partitions to get, wait and cycle 500 times
+            } while(cycleCnt++ < 500);
             if (key == null) {
                 return null;
             }

Reply via email to