This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 95bd84f5ce [INLONG-9289][Agent] Improve the completion judgment logic
of collecting instances (#9290)
95bd84f5ce is described below
commit 95bd84f5ce41aa07fe1961c932fca1a8f25c4605
Author: justinwwhuang <[email protected]>
AuthorDate: Wed Nov 15 10:50:17 2023 +0800
[INLONG-9289][Agent] Improve the completion judgment logic of collecting
instances (#9290)
---
.../inlong/agent/plugin/instance/FileInstance.java | 12 ++++++---
.../agent/plugin/sinks/filecollect/ProxySink.java | 30 ++++++++++++----------
2 files changed, 26 insertions(+), 16 deletions(-)
diff --git
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/instance/FileInstance.java
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/instance/FileInstance.java
index 5ef904e200..13aef15f9d 100644
---
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/instance/FileInstance.java
+++
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/instance/FileInstance.java
@@ -48,9 +48,11 @@ public class FileInstance extends Instance {
private InstanceProfile profile;
public static final int CORE_THREAD_SLEEP_TIME = 1;
private static final int DESTROY_LOOP_WAIT_TIME_MS = 10;
+ private static final int CHECK_FINISH_AT_LEAST_COUNT = 5;
private InstanceManager instanceManager;
private volatile boolean running = false;
private volatile boolean inited = false;
+ private volatile int checkFinishCount = 0;
@Override
public void init(Object srcManager, InstanceProfile srcProfile) {
@@ -103,11 +105,15 @@ public class FileInstance extends Instance {
Message msg = source.read();
if (msg == null) {
if (source.sourceFinish() && sink.sinkFinish()) {
- handleReadEnd();
- break;
+ checkFinishCount++;
+ if (checkFinishCount > CHECK_FINISH_AT_LEAST_COUNT) {
+ handleReadEnd();
+ break;
+ }
} else {
- AgentUtils.silenceSleepInSeconds(CORE_THREAD_SLEEP_TIME);
+ checkFinishCount = 0;
}
+ AgentUtils.silenceSleepInSeconds(CORE_THREAD_SLEEP_TIME);
} else {
sink.write(msg);
}
diff --git
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/filecollect/ProxySink.java
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/filecollect/ProxySink.java
index be7abce152..922400025e 100755
---
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/filecollect/ProxySink.java
+++
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/filecollect/ProxySink.java
@@ -36,7 +36,6 @@ import java.nio.charset.StandardCharsets;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicLong;
import static
org.apache.inlong.agent.constant.CommonConstants.DEFAULT_FIELD_SPLITTER;
import static
org.apache.inlong.agent.constant.FetcherConstants.AGENT_GLOBAL_WRITER_PERMIT;
@@ -49,8 +48,8 @@ public class ProxySink extends AbstractSink {
private static final Logger LOGGER =
LoggerFactory.getLogger(ProxySink.class);
private final int WRITE_FAILED_WAIT_TIME_MS = 10;
private final int DESTROY_LOOP_WAIT_TIME_MS = 10;
- private final Integer FINISH_READ_MAX_COUNT = 30;
- private static AtomicLong index = new AtomicLong(0);
+ private final Integer NO_WRITE_WAIT_AT_LEAST_MS = 5 * 1000;
+ private final Integer SINK_FINISH_AT_LEAST_COUNT = 5;
private static final ThreadPoolExecutor EXECUTOR_SERVICE = new
ThreadPoolExecutor(
0, Integer.MAX_VALUE,
1L, TimeUnit.SECONDS,
@@ -62,18 +61,17 @@ public class ProxySink extends AbstractSink {
private volatile boolean shutdown = false;
private volatile boolean running = false;
private volatile boolean inited = false;
- private volatile int readEndCount = 0;
+ private volatile long lastWriteTime = 0;
+ private volatile long checkSinkFinishCount = 0;
public ProxySink() {
}
@Override
public void write(Message message) {
- if (message == null) {
- return;
- }
boolean suc = false;
- while (running && !suc) {
+ while (!shutdown && !suc) {
+ lastWriteTime = AgentUtils.getCurrentTime();
suc = putInCache(message);
if (!suc) {
AgentUtils.silenceSleepInMs(WRITE_FAILED_WAIT_TIME_MS);
@@ -146,7 +144,7 @@ public class ProxySink extends AbstractSink {
try {
SenderMessage senderMessage = cache.fetchSenderMessage();
if (senderMessage != null) {
- readEndCount = 0;
+ checkSinkFinishCount = 0;
senderManager.sendBatch(senderMessage);
if (AgentUtils.getCurrentTime() - lastPrintTime >
TimeUnit.SECONDS.toMillis(1)) {
lastPrintTime = AgentUtils.getCurrentTime();
@@ -156,8 +154,11 @@ public class ProxySink extends AbstractSink {
profile.getInstanceId(),
senderMessage.getDataTime());
}
+ }
+ if (noWriteLongEnough() && senderManager.sendFinished()) {
+ checkSinkFinishCount++;
} else {
- readEndCount++;
+ checkSinkFinishCount = 0;
}
} catch (Exception ex) {
LOGGER.error("error caught", ex);
@@ -211,15 +212,18 @@ public class ProxySink extends AbstractSink {
*/
@Override
public boolean sinkFinish() {
- if (finishReadLog() && senderManager.sendFinished()) {
+ if (noWriteLongEnough() && sinkFinishLongEnough()) {
return true;
} else {
return false;
}
}
- public boolean finishReadLog() {
- return readEndCount > FINISH_READ_MAX_COUNT;
+ public boolean noWriteLongEnough() {
+ return AgentUtils.getCurrentTime() - lastWriteTime >
NO_WRITE_WAIT_AT_LEAST_MS;
}
+ public boolean sinkFinishLongEnough() {
+ return checkSinkFinishCount > SINK_FINISH_AT_LEAST_COUNT;
+ }
}