This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 24bdfc1f9e [INLONG-9969][Agent] Release the memory semaphore of the
source only when the data is placed in the queue (#9971)
24bdfc1f9e is described below
commit 24bdfc1f9eec5f8ea6cf5551c3085c0886ee3d5c
Author: justinwwhuang <[email protected]>
AuthorDate: Thu Apr 11 11:52:30 2024 +0800
[INLONG-9969][Agent] Release the memory semaphore of the source only when
the data is placed in the queue (#9971)
---
.../java/org/apache/inlong/agent/plugin/sources/KafkaSource.java | 7 ++++---
.../java/org/apache/inlong/agent/plugin/sources/LogFileSource.java | 3 ++-
.../java/org/apache/inlong/agent/plugin/sources/PulsarSource.java | 7 ++++---
3 files changed, 10 insertions(+), 7 deletions(-)
diff --git
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/KafkaSource.java
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/KafkaSource.java
index fa9034c6bc..62aa87433b 100644
---
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/KafkaSource.java
+++
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/KafkaSource.java
@@ -94,7 +94,7 @@ public class KafkaSource extends AbstractSource {
1L, TimeUnit.SECONDS,
new SynchronousQueue<>(),
new AgentThreadFactory("kafka-source"));
- private BlockingQueue<KafkaSource.SourceData> queue;
+ private BlockingQueue<SourceData> queue;
public InstanceProfile profile;
private int maxPackSize;
private String taskId;
@@ -213,13 +213,13 @@ public class KafkaSource extends AbstractSource {
break;
}
ConsumerRecords<String, byte[]> records =
kafkaConsumer.poll(Duration.ofMillis(1000));
-
MemoryManager.getInstance().release(AGENT_GLOBAL_READER_SOURCE_PERMIT,
BATCH_TOTAL_LEN);
if (records.isEmpty()) {
if (queue.isEmpty()) {
emptyCount.incrementAndGet();
} else {
emptyCount.set(0);
}
+
MemoryManager.getInstance().release(AGENT_GLOBAL_READER_SOURCE_PERMIT,
BATCH_TOTAL_LEN);
AgentUtils.silenceSleepInSeconds(1);
continue;
}
@@ -234,6 +234,7 @@ public class KafkaSource extends AbstractSource {
putIntoQueue(sourceData);
offset = record.offset();
}
+
MemoryManager.getInstance().release(AGENT_GLOBAL_READER_SOURCE_PERMIT,
BATCH_TOTAL_LEN);
kafkaConsumer.commitSync();
if (AgentUtils.getCurrentTime() - lastPrintTime >
CORE_THREAD_PRINT_INTERVAL_MS) {
@@ -298,7 +299,7 @@ public class KafkaSource extends AbstractSource {
@Override
public Message read() {
- KafkaSource.SourceData sourceData = null;
+ SourceData sourceData = null;
try {
sourceData = queue.poll(READ_WAIT_TIMEOUT_MS,
TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
diff --git
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/LogFileSource.java
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/LogFileSource.java
index 3ba0f28f9c..9f83f6fc06 100755
---
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/LogFileSource.java
+++
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/LogFileSource.java
@@ -476,13 +476,13 @@ public class LogFileSource extends AbstractSource {
} catch (IOException e) {
LOGGER.error("readFromPos error: ", e);
}
-
MemoryManager.getInstance().release(AGENT_GLOBAL_READER_SOURCE_PERMIT,
BATCH_READ_LINE_TOTAL_LEN);
if (lines.isEmpty()) {
if (queue.isEmpty()) {
emptyCount++;
} else {
emptyCount = 0;
}
+
MemoryManager.getInstance().release(AGENT_GLOBAL_READER_SOURCE_PERMIT,
BATCH_READ_LINE_TOTAL_LEN);
AgentUtils.silenceSleepInSeconds(1);
continue;
}
@@ -494,6 +494,7 @@ public class LogFileSource extends AbstractSource {
}
putIntoQueue(lines.get(i));
}
+
MemoryManager.getInstance().release(AGENT_GLOBAL_READER_SOURCE_PERMIT,
BATCH_READ_LINE_TOTAL_LEN);
if (AgentUtils.getCurrentTime() - lastPrintTime >
CORE_THREAD_PRINT_INTERVAL_MS) {
lastPrintTime = AgentUtils.getCurrentTime();
LOGGER.info("path is {}, linePosition {}, bytePosition is {}
file len {}, reads lines size {}",
diff --git
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/PulsarSource.java
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/PulsarSource.java
index ebd8495d9f..c64653e2a3 100644
---
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/PulsarSource.java
+++
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/PulsarSource.java
@@ -185,13 +185,13 @@ public class PulsarSource extends AbstractSource {
break;
}
org.apache.pulsar.client.api.Message<byte[]> message =
consumer.receive(0, TimeUnit.MILLISECONDS);
-
MemoryManager.getInstance().release(AGENT_GLOBAL_READER_SOURCE_PERMIT,
BATCH_TOTAL_LEN);
if (ObjectUtils.isEmpty(message)) {
if (queue.isEmpty()) {
emptyCount.incrementAndGet();
} else {
emptyCount.set(0);
}
+
MemoryManager.getInstance().release(AGENT_GLOBAL_READER_SOURCE_PERMIT,
BATCH_TOTAL_LEN);
AgentUtils.silenceSleepInSeconds(1);
continue;
}
@@ -203,6 +203,7 @@ public class PulsarSource extends AbstractSource {
break;
}
putIntoQueue(sourceData);
+
MemoryManager.getInstance().release(AGENT_GLOBAL_READER_SOURCE_PERMIT,
BATCH_TOTAL_LEN);
consumer.acknowledge(message);
if (AgentUtils.getCurrentTime() - lastPrintTime >
CORE_THREAD_PRINT_INTERVAL_MS) {
@@ -260,7 +261,7 @@ public class PulsarSource extends AbstractSource {
@Override
public Message read() {
- PulsarSource.SourceData sourceData = null;
+ SourceData sourceData = null;
try {
sourceData = queue.poll(READ_WAIT_TIMEOUT_MS,
TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
@@ -274,7 +275,7 @@ public class PulsarSource extends AbstractSource {
return finalMsg;
}
- private Message createMessage(PulsarSource.SourceData sourceData) {
+ private Message createMessage(SourceData sourceData) {
String proxyPartitionKey = profile.get(PROXY_SEND_PARTITION_KEY,
DigestUtils.md5Hex(inlongGroupId));
Map<String, String> header = new HashMap<>();
header.put(PROXY_KEY_DATA, proxyPartitionKey);