This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 4f58731 [INLONG-3304][Agent] Agent reader cost too much cpu (#3305)
4f58731 is described below
commit 4f58731ce691a3410edbaf835c515c47a5a61442
Author: Schnapps <[email protected]>
AuthorDate: Wed Mar 23 09:45:03 2022 +0800
[INLONG-3304][Agent] Agent reader cost too much cpu (#3305)
---
.../main/java/org/apache/inlong/agent/core/task/TaskWrapper.java | 7 +++++++
1 file changed, 7 insertions(+)
diff --git
a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/task/TaskWrapper.java
b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/task/TaskWrapper.java
index ed1740d..8041c0c 100755
---
a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/task/TaskWrapper.java
+++
b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/task/TaskWrapper.java
@@ -17,6 +17,9 @@
package org.apache.inlong.agent.core.task;
+import static
org.apache.inlong.agent.constant.JobConstants.DEFAULT_JOB_READ_WAIT_TIMEOUT;
+import static
org.apache.inlong.agent.constant.JobConstants.JOB_READ_WAIT_TIMEOUT;
+
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.SynchronousQueue;
@@ -32,6 +35,7 @@ import org.apache.inlong.agent.message.EndMessage;
import org.apache.inlong.agent.plugin.Message;
import org.apache.inlong.agent.state.AbstractStateWrapper;
import org.apache.inlong.agent.state.State;
+import org.apache.inlong.agent.utils.AgentUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -51,6 +55,7 @@ public class TaskWrapper extends AbstractStateWrapper {
private final int maxRetryTime;
private final int pushMaxWaitTime;
private final int pullMaxWaitTime;
+ private final int readWaitTime;
private ExecutorService executorService;
public TaskWrapper(AgentManager manager, Task task) {
@@ -64,6 +69,7 @@ public class TaskWrapper extends AbstractStateWrapper {
AgentConstants.TASK_PUSH_MAX_SECOND,
AgentConstants.DEFAULT_TASK_PUSH_MAX_SECOND);
pullMaxWaitTime = conf.getInt(
AgentConstants.TASK_PULL_MAX_SECOND,
AgentConstants.DEFAULT_TASK_PULL_MAX_SECOND);
+ readWaitTime = conf.getInt(JOB_READ_WAIT_TIMEOUT,
DEFAULT_JOB_READ_WAIT_TIMEOUT);
if (executorService == null) {
executorService = new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
@@ -91,6 +97,7 @@ public class TaskWrapper extends AbstractStateWrapper {
message = task.getReader().read();
}
}
+ AgentUtils.silenceSleepInMs(readWaitTime);
}
LOGGER.info("read end, task exception status is {}, read finish
status is {}", isException(),
task.isReadFinished());