This is an automated email from the ASF dual-hosted git repository.
gosonzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 5809c3a7c4 [INLONG-8385][DataProxy] Add take method in BufferQueue
class (#8386)
5809c3a7c4 is described below
commit 5809c3a7c4866a36c6c9447ac467a1761005903b
Author: Goson Zhang <[email protected]>
AuthorDate: Fri Jun 30 17:34:36 2023 +0800
[INLONG-8385][DataProxy] Add take method in BufferQueue class (#8386)
---
.../sink/mq/MessageQueueZoneProducer.java | 2 +-
.../dataproxy/sink/mq/MessageQueueZoneSink.java | 4 ++++
.../dataproxy/sink/mq/MessageQueueZoneWorker.java | 2 +-
.../apache/inlong/dataproxy/utils/BufferQueue.java | 23 ++++++++++++++++++++++
4 files changed, 29 insertions(+), 2 deletions(-)
diff --git
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/MessageQueueZoneProducer.java
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/MessageQueueZoneProducer.java
index 358165a3af..8392d2d150 100644
---
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/MessageQueueZoneProducer.java
+++
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/MessageQueueZoneProducer.java
@@ -69,7 +69,7 @@ public class MessageQueueZoneProducer {
*/
public void start() {
try {
- logger.info("start MessageQueueZoneProducer:{}",
zoneSink.getName());
+ logger.info("{} start MessageQueueZoneProducer",
zoneSink.getName());
this.reloadMetaConfig();
} catch (Exception e) {
logger.error(e.getMessage(), e);
diff --git
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/MessageQueueZoneSink.java
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/MessageQueueZoneSink.java
index 76e50d697e..d287c2df59 100644
---
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/MessageQueueZoneSink.java
+++
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/MessageQueueZoneSink.java
@@ -281,6 +281,10 @@ public class MessageQueueZoneSink extends AbstractSink
implements Configurable,
return this.dispatchQueue.pollRecord();
}
+ public PackProfile takeDispatchedRecord() {
+ return this.dispatchQueue.takeRecord();
+ }
+
public void releaseAcquiredSizePermit(PackProfile record) {
this.dispatchQueue.release(record.getSize());
}
diff --git
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/MessageQueueZoneWorker.java
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/MessageQueueZoneWorker.java
index ad61167295..524c322db4 100644
---
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/MessageQueueZoneWorker.java
+++
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/MessageQueueZoneWorker.java
@@ -78,7 +78,7 @@ public class MessageQueueZoneWorker extends Thread {
PackProfile profile = null;
while (status != LifecycleState.STOP) {
try {
- profile = this.mqZoneSink.pollDispatchedRecord();
+ profile = this.mqZoneSink.takeDispatchedRecord();
if (profile == null) {
this.sleepOneInterval();
continue;
diff --git
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/utils/BufferQueue.java
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/utils/BufferQueue.java
index 15eea923f5..cf58fc12dc 100644
---
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/utils/BufferQueue.java
+++
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/utils/BufferQueue.java
@@ -30,6 +30,7 @@ public class BufferQueue<A> {
private SizeSemaphore globalTokens = null;
private final AtomicLong offerCount = new AtomicLong(0);
private final AtomicLong pollCount = new AtomicLong(0);
+ private final AtomicLong takeCount = new AtomicLong(0);
/**
* Constructor
@@ -61,6 +62,19 @@ public class BufferQueue<A> {
return record;
}
+ /**
+ * Take record
+ */
+ public A takeRecord() {
+ try {
+ A record = queue.take();
+ this.takeCount.incrementAndGet();
+ return record;
+ } catch (Throwable e) {
+ return null;
+ }
+ }
+
/**
* offer
*/
@@ -171,4 +185,13 @@ public class BufferQueue<A> {
public long getPollCount() {
return pollCount.getAndSet(0);
}
+
+ /**
+ * get takeCount
+ *
+ * @return the take count
+ */
+ public long getTakeCount() {
+ return takeCount.getAndSet(0);
+ }
}