This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 702385afe9 [INLONG-9267][Agent] Fix bug: data loss when there are many
files to read once (#9268)
702385afe9 is described below
commit 702385afe927ed229dec4a983281789782efa622
Author: justinwwhuang <[email protected]>
AuthorDate: Sun Nov 12 14:56:53 2023 +0800
[INLONG-9267][Agent] Fix bug: data loss when there are many files to read
once (#9268)
---
.../inlong/agent/plugin/sinks/filecollect/ProxySink.java | 15 ++++++++++++++-
.../inlong/agent/plugin/sources/LogFileSource.java | 16 ++++++++++++----
.../inlong/agent/plugin/sources/TestLogFileSource.java | 6 ++++--
3 files changed, 30 insertions(+), 7 deletions(-)
diff --git
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/filecollect/ProxySink.java
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/filecollect/ProxySink.java
index 806612e661..be7abce152 100755
---
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/filecollect/ProxySink.java
+++
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/filecollect/ProxySink.java
@@ -49,6 +49,7 @@ public class ProxySink extends AbstractSink {
private static final Logger LOGGER =
LoggerFactory.getLogger(ProxySink.class);
private final int WRITE_FAILED_WAIT_TIME_MS = 10;
private final int DESTROY_LOOP_WAIT_TIME_MS = 10;
+ private final Integer FINISH_READ_MAX_COUNT = 30;
private static AtomicLong index = new AtomicLong(0);
private static final ThreadPoolExecutor EXECUTOR_SERVICE = new
ThreadPoolExecutor(
0, Integer.MAX_VALUE,
@@ -61,6 +62,7 @@ public class ProxySink extends AbstractSink {
private volatile boolean shutdown = false;
private volatile boolean running = false;
private volatile boolean inited = false;
+ private volatile int readEndCount = 0;
public ProxySink() {
}
@@ -144,6 +146,7 @@ public class ProxySink extends AbstractSink {
try {
SenderMessage senderMessage = cache.fetchSenderMessage();
if (senderMessage != null) {
+ readEndCount = 0;
senderManager.sendBatch(senderMessage);
if (AgentUtils.getCurrentTime() - lastPrintTime >
TimeUnit.SECONDS.toMillis(1)) {
lastPrintTime = AgentUtils.getCurrentTime();
@@ -153,6 +156,8 @@ public class ProxySink extends AbstractSink {
profile.getInstanceId(),
senderMessage.getDataTime());
}
+ } else {
+ readEndCount++;
}
} catch (Exception ex) {
LOGGER.error("error caught", ex);
@@ -206,7 +211,15 @@ public class ProxySink extends AbstractSink {
*/
@Override
public boolean sinkFinish() {
- return cache.isEmpty() && senderManager.sendFinished();
+ if (finishReadLog() && senderManager.sendFinished()) {
+ return true;
+ } else {
+ return false;
+ }
+ }
+
+ public boolean finishReadLog() {
+ return readEndCount > FINISH_READ_MAX_COUNT;
}
}
diff --git
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/LogFileSource.java
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/LogFileSource.java
index 7b94e3f068..089f0fee75 100755
---
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/LogFileSource.java
+++
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/LogFileSource.java
@@ -132,6 +132,7 @@ public class LogFileSource extends AbstractSource {
private volatile boolean running = false;
public LogFileSource() {
+ OffsetManager.init();
}
@Override
@@ -175,7 +176,7 @@ public class LogFileSource extends AbstractSource {
}
private long getInitLineOffset(boolean isIncrement, String taskId, String
instanceId, String inodeInfo) {
- OffsetProfile offsetProfile = OffsetManager.init().getOffset(taskId,
instanceId);
+ OffsetProfile offsetProfile =
OffsetManager.getInstance().getOffset(taskId, instanceId);
int fileLineCount = getRealLineCount(instanceId);
long offset = 0;
if (offsetProfile != null &&
offsetProfile.getInodeInfo().compareTo(inodeInfo) == 0) {
@@ -334,9 +335,8 @@ public class LogFileSource extends AbstractSource {
}
if (sourceData == null) {
return null;
- } else {
-
MemoryManager.getInstance().release(AGENT_GLOBAL_READER_QUEUE_PERMIT,
sourceData.data.length());
}
+ MemoryManager.getInstance().release(AGENT_GLOBAL_READER_QUEUE_PERMIT,
sourceData.data.length());
Message finalMsg = createMessage(sourceData);
return finalMsg;
}
@@ -377,7 +377,7 @@ public class LogFileSource extends AbstractSource {
suc = MemoryManager.getInstance().tryAcquire(permitName,
permitLen);
if (!suc) {
MemoryManager.getInstance().printDetail(permitName, "log file
source");
- if (!isInodeChanged() || !isRunnable()) {
+ if (isInodeChanged() || !isRunnable()) {
return false;
}
AgentUtils.silenceSleepInSeconds(1);
@@ -530,6 +530,14 @@ public class LogFileSource extends AbstractSource {
@Override
public boolean sourceFinish() {
+ if (finishReadLog() && queue.isEmpty()) {
+ return true;
+ } else {
+ return false;
+ }
+ }
+
+ public boolean finishReadLog() {
return readEndCount > FINISH_READ_MAX_COUNT;
}
diff --git
a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestLogFileSource.java
b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestLogFileSource.java
index eb9c8ccf89..b2a3b605b7 100644
---
a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestLogFileSource.java
+++
b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestLogFileSource.java
@@ -101,7 +101,7 @@ public class TestLogFileSource {
srcLen += check[i].getBytes(StandardCharsets.UTF_8).length;
}
LogFileSource source = getSource();
- await().atMost(2, TimeUnit.SECONDS).until(() -> source.sourceFinish());
+ await().atMost(2, TimeUnit.SECONDS).until(() ->
source.finishReadLog());
int cnt = 0;
int leftBeforeRead =
MemoryManager.getInstance().getLeft(AGENT_GLOBAL_READER_QUEUE_PERMIT);
Assert.assertTrue(leftBeforeRead + srcLen ==
DEFAULT_AGENT_GLOBAL_READER_QUEUE_PERMIT);
@@ -114,6 +114,7 @@ public class TestLogFileSource {
msg = source.read();
cnt++;
}
+ await().atMost(2, TimeUnit.SECONDS).until(() -> source.sourceFinish());
source.destroy();
Assert.assertTrue(cnt == 3);
Assert.assertTrue(srcLen == readLen);
@@ -123,10 +124,11 @@ public class TestLogFileSource {
private void testCleanQueue() {
LogFileSource source = getSource();
- await().atMost(2, TimeUnit.SECONDS).until(() -> source.sourceFinish());
+ await().atMost(2, TimeUnit.SECONDS).until(() ->
source.finishReadLog());
for (int i = 0; i < 2; i++) {
source.read();
}
+ Assert.assertTrue(!source.sourceFinish());
source.destroy();
int leftAfterRead =
MemoryManager.getInstance().getLeft(AGENT_GLOBAL_READER_QUEUE_PERMIT);
Assert.assertTrue(leftAfterRead ==
DEFAULT_AGENT_GLOBAL_READER_QUEUE_PERMIT);