This is an automated email from the ASF dual-hosted git repository.

gosonzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 5809c3a7c4 [INLONG-8385][DataProxy] Add take method in BufferQueue 
class (#8386)
5809c3a7c4 is described below

commit 5809c3a7c4866a36c6c9447ac467a1761005903b
Author: Goson Zhang <[email protected]>
AuthorDate: Fri Jun 30 17:34:36 2023 +0800

    [INLONG-8385][DataProxy] Add take method in BufferQueue class (#8386)
---
 .../sink/mq/MessageQueueZoneProducer.java          |  2 +-
 .../dataproxy/sink/mq/MessageQueueZoneSink.java    |  4 ++++
 .../dataproxy/sink/mq/MessageQueueZoneWorker.java  |  2 +-
 .../apache/inlong/dataproxy/utils/BufferQueue.java | 23 ++++++++++++++++++++++
 4 files changed, 29 insertions(+), 2 deletions(-)

diff --git 
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/MessageQueueZoneProducer.java
 
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/MessageQueueZoneProducer.java
index 358165a3af..8392d2d150 100644
--- 
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/MessageQueueZoneProducer.java
+++ 
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/MessageQueueZoneProducer.java
@@ -69,7 +69,7 @@ public class MessageQueueZoneProducer {
      */
     public void start() {
         try {
-            logger.info("start MessageQueueZoneProducer:{}", 
zoneSink.getName());
+            logger.info("{} start MessageQueueZoneProducer", 
zoneSink.getName());
             this.reloadMetaConfig();
         } catch (Exception e) {
             logger.error(e.getMessage(), e);
diff --git 
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/MessageQueueZoneSink.java
 
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/MessageQueueZoneSink.java
index 76e50d697e..d287c2df59 100644
--- 
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/MessageQueueZoneSink.java
+++ 
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/MessageQueueZoneSink.java
@@ -281,6 +281,10 @@ public class MessageQueueZoneSink extends AbstractSink 
implements Configurable,
         return this.dispatchQueue.pollRecord();
     }
 
+    public PackProfile takeDispatchedRecord() {
+        return this.dispatchQueue.takeRecord();
+    }
+
     public void releaseAcquiredSizePermit(PackProfile record) {
         this.dispatchQueue.release(record.getSize());
     }
diff --git 
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/MessageQueueZoneWorker.java
 
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/MessageQueueZoneWorker.java
index ad61167295..524c322db4 100644
--- 
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/MessageQueueZoneWorker.java
+++ 
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/MessageQueueZoneWorker.java
@@ -78,7 +78,7 @@ public class MessageQueueZoneWorker extends Thread {
         PackProfile profile = null;
         while (status != LifecycleState.STOP) {
             try {
-                profile = this.mqZoneSink.pollDispatchedRecord();
+                profile = this.mqZoneSink.takeDispatchedRecord();
                 if (profile == null) {
                     this.sleepOneInterval();
                     continue;
diff --git 
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/utils/BufferQueue.java
 
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/utils/BufferQueue.java
index 15eea923f5..cf58fc12dc 100644
--- 
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/utils/BufferQueue.java
+++ 
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/utils/BufferQueue.java
@@ -30,6 +30,7 @@ public class BufferQueue<A> {
     private SizeSemaphore globalTokens = null;
     private final AtomicLong offerCount = new AtomicLong(0);
     private final AtomicLong pollCount = new AtomicLong(0);
+    private final AtomicLong takeCount = new AtomicLong(0);
 
     /**
      * Constructor
@@ -61,6 +62,19 @@ public class BufferQueue<A> {
         return record;
     }
 
+    /**
+     * Take record
+     */
+    public A takeRecord() {
+        try {
+            A record = queue.take();
+            this.takeCount.incrementAndGet();
+            return record;
+        } catch (Throwable e) {
+            return null;
+        }
+    }
+
     /**
      * offer
      */
@@ -171,4 +185,13 @@ public class BufferQueue<A> {
     public long getPollCount() {
         return pollCount.getAndSet(0);
     }
+
+    /**
+     * get takeCount
+     *
+     * @return the take count
+     */
+    public long getTakeCount() {
+        return takeCount.getAndSet(0);
+    }
 }

Reply via email to