This is an automated email from the ASF dual-hosted git repository.
aloyszhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 76a621696 [INLONG-8026][Agent] Improve the Agent performance (#8027)
76a621696 is described below
commit 76a621696d7ee9f5f747923dfbd5802189cc533e
Author: doleyzi <[email protected]>
AuthorDate: Mon May 15 19:16:35 2023 +0800
[INLONG-8026][Agent] Improve the Agent performance (#8027)
* Improve the Agent performance
* Add InLong Changelog
* Add InLong Changelog
---------
Co-authored-by: doleyzi <[email protected]>
---
CHANGES.md | 4 +-
.../inlong/agent/message/BatchProxyMessage.java | 1 -
.../inlong/agent/message/PackProxyMessage.java | 54 ++++---
.../org/apache/inlong/agent/core/AgentManager.java | 28 ++--
.../apache/inlong/agent/core/conf/ConfigJetty.java | 17 +--
.../apache/inlong/agent/core/job/JobManager.java | 72 +++++----
.../inlong/agent/core/trigger/TriggerManager.java | 31 ++--
.../agent/plugin/fetcher/ManagerFetcher.java | 168 ++++++++-------------
.../inlong/agent/plugin/sinks/ProxySink.java | 90 +++++++----
.../inlong/agent/plugin/sinks/PulsarSink.java | 3 +-
.../agent/plugin/sources/TextFileSource.java | 16 --
.../sources/reader/file/MonitorTextFile.java | 15 +-
.../org/apache/inlong/agent/plugin/MiniAgent.java | 7 +-
.../agent/plugin/trigger/TestTriggerManager.java | 2 +-
14 files changed, 255 insertions(+), 253 deletions(-)
diff --git a/CHANGES.md b/CHANGES.md
index bac536234..48c92680f 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -27,7 +27,9 @@
|:-----------------------------------------------------------:|:---------------------------------------------------------------------------------------------------------------------------------------------------|
| [INLONG-7847](https://github.com/apache/inlong/issues/7847) | [Bug][Agent]
Failed to create MySQL reader
|
| [INLONG-7783](https://github.com/apache/inlong/issues/7783) |
[Feature][Agent] Support sink data tor Kafka
|
-| [INLONG-7752](https://github.com/apache/inlong/issues/7752) | [Bug][Agent]
PulsarSink threadPool throw reject exception
|
+| [INLONG-7752](https://github.com/apache/inlong/issues/7752) | [Bug][Agent]
PulsarSink threadPool throw reject exception
|
+| [INLONG-7976](https://github.com/apache/inlong/issues/7976) | [Bug][Agent]
The data collected by the agent is incomplete
|
+| [INLONG-8026](https://github.com/apache/inlong/issues/8026) |
[Improve][Agent] Improve the Agent performance
|
### DataProxy
| ISSUE | Summary
|
diff --git
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/message/BatchProxyMessage.java
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/message/BatchProxyMessage.java
index 39be15437..c34d8eaf5 100644
---
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/message/BatchProxyMessage.java
+++
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/message/BatchProxyMessage.java
@@ -41,7 +41,6 @@ public class BatchProxyMessage {
private List<byte[]> dataList;
private long dataTime;
private Map<String, String> extraMap;
- private boolean isSyncSend;
public InLongMsg getInLongMsg() {
InLongMsg message = InLongMsg.newInLongMsg(true);
diff --git
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/message/PackProxyMessage.java
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/message/PackProxyMessage.java
index 6eb54cd1c..61b233abe 100644
---
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/message/PackProxyMessage.java
+++
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/message/PackProxyMessage.java
@@ -17,30 +17,28 @@
package org.apache.inlong.agent.message;
-import org.apache.inlong.agent.conf.JobProfile;
-import org.apache.inlong.agent.utils.AgentUtils;
-import org.apache.inlong.common.msg.AttributeConstants;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.atomic.AtomicLong;
-
import static
org.apache.inlong.agent.constant.CommonConstants.DEFAULT_PROXY_INLONG_STREAM_ID_QUEUE_MAX_NUMBER;
import static
org.apache.inlong.agent.constant.CommonConstants.DEFAULT_PROXY_PACKAGE_MAX_SIZE;
import static
org.apache.inlong.agent.constant.CommonConstants.DEFAULT_PROXY_PACKAGE_MAX_TIMEOUT_MS;
import static
org.apache.inlong.agent.constant.CommonConstants.PROXY_INLONG_STREAM_ID_QUEUE_MAX_NUMBER;
import static
org.apache.inlong.agent.constant.CommonConstants.PROXY_PACKAGE_MAX_SIZE;
import static
org.apache.inlong.agent.constant.CommonConstants.PROXY_PACKAGE_MAX_TIMEOUT_MS;
-import static org.apache.inlong.agent.constant.CommonConstants.PROXY_SEND_SYNC;
import static org.apache.inlong.common.msg.AttributeConstants.DATA_TIME;
import static org.apache.inlong.common.msg.AttributeConstants.MESSAGE_TOPIC;
import static org.apache.inlong.common.msg.AttributeConstants.STREAM_ID;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.atomic.AtomicLong;
+import org.apache.inlong.agent.conf.JobProfile;
+import org.apache.inlong.agent.utils.AgentUtils;
+import org.apache.inlong.common.msg.AttributeConstants;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
/**
* Handle List of BusMessage, which belong to the same stream id.
*/
@@ -58,7 +56,6 @@ public class PackProxyMessage {
// streamId -> list of proxyMessage
private final LinkedBlockingQueue<ProxyMessage> messageQueue;
private final AtomicLong queueSize = new AtomicLong(0);
- private boolean syncSend;
private int currentSize;
/**
* extra map used when sending to dataproxy
@@ -79,9 +76,7 @@ public class PackProxyMessage {
this.messageQueue = new LinkedBlockingQueue<>(maxQueueNumber);
this.groupId = groupId;
this.streamId = streamId;
- // handle syncSend flag
- this.syncSend = jobConf.getBoolean(PROXY_SEND_SYNC, false);
- extraMap.put(AttributeConstants.MESSAGE_SYNC_SEND,
String.valueOf(syncSend));
+ extraMap.put(AttributeConstants.MESSAGE_SYNC_SEND, "false");
}
public void generateExtraMap(String dataKey) {
@@ -106,18 +101,21 @@ public class PackProxyMessage {
/**
* Add proxy message to cache, proxy message should belong to the same
stream id.
*/
- public void addProxyMessage(ProxyMessage message) {
+ public boolean addProxyMessage(ProxyMessage message) {
assert streamId.equals(message.getInlongStreamId());
try {
if (queueIsFull()) {
LOGGER.warn("message queue is greater than {}, stop adding
message, "
+ "maybe proxy get stuck", maxQueueNumber);
+ return false;
}
messageQueue.put(message);
queueSize.addAndGet(message.getBody().length);
+ return true;
} catch (Exception ex) {
LOGGER.error("exception caught", ex);
}
+ return false;
}
/**
@@ -144,8 +142,19 @@ public class PackProxyMessage {
while (!messageQueue.isEmpty()) {
// pre check message size
ProxyMessage peekMessage = messageQueue.peek();
- if (peekMessage == null
- || resultBatchSize + peekMessage.getBody().length >
maxPackSize) {
+ if (peekMessage == null) {
+ break;
+ }
+
+ // if the message size is greater than max pack size,should
drop it.
+ int peekMessageLength = peekMessage.getBody().length;
+ if (peekMessageLength > maxPackSize) {
+ LOGGER.warn("message size is {}, greater than max pack
size {}, drop it!",
+ peekMessage.getBody().length, maxPackSize);
+ messageQueue.remove();
+ break;
+ }
+ if (resultBatchSize + peekMessageLength > maxPackSize) {
break;
}
ProxyMessage message = messageQueue.remove();
@@ -159,8 +168,7 @@ public class PackProxyMessage {
}
// make sure result is not empty.
if (!result.isEmpty()) {
- return new BatchProxyMessage(jobId, groupId, streamId, result,
AgentUtils.getCurrentTime(), extraMap,
- syncSend);
+ return new BatchProxyMessage(jobId, groupId, streamId, result,
AgentUtils.getCurrentTime(), extraMap);
}
}
return null;
diff --git
a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/AgentManager.java
b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/AgentManager.java
index 42e058823..a571ee9d9 100755
---
a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/AgentManager.java
+++
b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/AgentManager.java
@@ -17,6 +17,16 @@
package org.apache.inlong.agent.core;
+import static
org.apache.inlong.agent.constant.AgentConstants.AGENT_CONF_PARENT;
+import static
org.apache.inlong.agent.constant.AgentConstants.DEFAULT_AGENT_CONF_PARENT;
+import static org.apache.inlong.agent.constant.JobConstants.JOB_TRIGGER;
+
+import java.io.File;
+import java.lang.reflect.Constructor;
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
import org.apache.inlong.agent.common.AbstractDaemon;
import org.apache.inlong.agent.conf.AgentConfiguration;
import org.apache.inlong.agent.conf.JobProfile;
@@ -36,16 +46,6 @@ import org.apache.inlong.agent.db.TriggerProfileDb;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.io.File;
-import java.lang.reflect.Constructor;
-import java.util.List;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-
-import static
org.apache.inlong.agent.constant.AgentConstants.AGENT_CONF_PARENT;
-import static
org.apache.inlong.agent.constant.AgentConstants.DEFAULT_AGENT_CONF_PARENT;
-import static org.apache.inlong.agent.constant.JobConstants.JOB_TRIGGER;
-
/**
* Agent Manager, the bridge for job manager, task manager, db e.t.c it
manages agent level operations and communicates
* with outside system.
@@ -196,11 +196,17 @@ public class AgentManager extends AbstractDaemon {
public void start() throws Exception {
LOGGER.info("starting agent manager");
agentConfMonitor.submit(startHotConfReplace());
+ LOGGER.info("starting job manager");
jobManager.start();
+ LOGGER.info("starting trigger manager");
triggerManager.start();
+ LOGGER.info("starting task manager");
taskManager.start();
+ LOGGER.info("starting heartbeat manager");
heartbeatManager.start();
+ LOGGER.info("starting task position manager");
taskPositionManager.start();
+ LOGGER.info("starting read job from local");
// read job profiles from local
List<JobProfile> profileList = localProfile.readFromLocal();
for (JobProfile profile : profileList) {
@@ -215,9 +221,11 @@ public class AgentManager extends AbstractDaemon {
jobManager.submitFileJobProfile(profile);
}
}
+ LOGGER.info("starting fetcher");
if (fetcher != null) {
fetcher.start();
}
+ LOGGER.info("starting agent manager end");
}
/**
diff --git
a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/conf/ConfigJetty.java
b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/conf/ConfigJetty.java
index d29cecdb7..cf9675e05 100644
---
a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/conf/ConfigJetty.java
+++
b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/conf/ConfigJetty.java
@@ -17,6 +17,10 @@
package org.apache.inlong.agent.core.conf;
+import static org.apache.inlong.agent.constant.JobConstants.JOB_SOURCE_TYPE;
+import static org.apache.inlong.agent.constant.JobConstants.JOB_TRIGGER;
+
+import java.io.Closeable;
import org.apache.inlong.agent.conf.AgentConfiguration;
import org.apache.inlong.agent.conf.JobProfile;
import org.apache.inlong.agent.conf.TriggerProfile;
@@ -32,11 +36,6 @@ import org.eclipse.jetty.servlet.ServletHolder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.io.Closeable;
-
-import static org.apache.inlong.agent.constant.JobConstants.JOB_SOURCE_TYPE;
-import static org.apache.inlong.agent.constant.JobConstants.JOB_TRIGGER;
-
/**
* start http server and get job/agent config via http
*/
@@ -88,7 +87,7 @@ public class ConfigJetty implements Closeable {
// trigger job is a special kind of job
if (jobProfile.hasKey(JOB_TRIGGER)) {
triggerManager.submitTrigger(
- TriggerProfile.parseJsonStr(jobProfile.toJsonStr()));
+ TriggerProfile.parseJsonStr(jobProfile.toJsonStr()),
true);
} else {
TaskTypeEnum taskType = TaskTypeEnum
.getTaskType(jobProfile.getInt(JOB_SOURCE_TYPE));
@@ -99,7 +98,7 @@ public class ConfigJetty implements Closeable {
case KAFKA:
case BINLOG:
case SQL:
- jobManager.submitJobProfile(jobProfile, true);
+ jobManager.submitJobProfile(jobProfile, true, true);
break;
default:
LOGGER.error("source type not supported {}", taskType);
@@ -123,9 +122,9 @@ public class ConfigJetty implements Closeable {
public void deleteJobConf(JobProfile jobProfile) {
if (jobProfile != null) {
if (jobProfile.hasKey(JOB_TRIGGER)) {
-
triggerManager.deleteTrigger(TriggerProfile.parseJobProfile(jobProfile).getTriggerId());
+
triggerManager.deleteTrigger(TriggerProfile.parseJobProfile(jobProfile).getTriggerId(),
false);
} else {
- jobManager.deleteJob(jobProfile.getInstanceId());
+ jobManager.deleteJob(jobProfile.getInstanceId(), false);
}
}
}
diff --git
a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/job/JobManager.java
b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/job/JobManager.java
index c4171d95f..f91285b2e 100644
---
a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/job/JobManager.java
+++
b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/job/JobManager.java
@@ -17,6 +17,27 @@
package org.apache.inlong.agent.core.job;
+import static
org.apache.inlong.agent.constant.AgentConstants.DEFAULT_JOB_DB_CACHE_CHECK_INTERVAL;
+import static
org.apache.inlong.agent.constant.AgentConstants.DEFAULT_JOB_DB_CACHE_TIME;
+import static
org.apache.inlong.agent.constant.AgentConstants.DEFAULT_JOB_NUMBER_LIMIT;
+import static
org.apache.inlong.agent.constant.AgentConstants.JOB_DB_CACHE_CHECK_INTERVAL;
+import static
org.apache.inlong.agent.constant.AgentConstants.JOB_DB_CACHE_TIME;
+import static org.apache.inlong.agent.constant.AgentConstants.JOB_NUMBER_LIMIT;
+import static org.apache.inlong.agent.constant.AgentConstants.JOB_VERSION;
+import static org.apache.inlong.agent.constant.JobConstants.JOB_ID;
+import static org.apache.inlong.agent.constant.JobConstants.JOB_ID_PREFIX;
+import static org.apache.inlong.agent.constant.JobConstants.JOB_INSTANCE_ID;
+import static org.apache.inlong.agent.constant.JobConstants.SQL_JOB_ID;
+import static
org.apache.inlong.agent.metrics.AgentMetricItem.KEY_COMPONENT_NAME;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.SynchronousQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
import org.apache.inlong.agent.common.AbstractDaemon;
import org.apache.inlong.agent.common.AgentThreadFactory;
import org.apache.inlong.agent.conf.AgentConfiguration;
@@ -34,27 +55,6 @@ import org.apache.inlong.common.metric.MetricRegister;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.SynchronousQueue;
-import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicLong;
-
-import static
org.apache.inlong.agent.constant.AgentConstants.DEFAULT_JOB_DB_CACHE_CHECK_INTERVAL;
-import static
org.apache.inlong.agent.constant.AgentConstants.DEFAULT_JOB_DB_CACHE_TIME;
-import static
org.apache.inlong.agent.constant.AgentConstants.DEFAULT_JOB_NUMBER_LIMIT;
-import static
org.apache.inlong.agent.constant.AgentConstants.JOB_DB_CACHE_CHECK_INTERVAL;
-import static
org.apache.inlong.agent.constant.AgentConstants.JOB_DB_CACHE_TIME;
-import static org.apache.inlong.agent.constant.AgentConstants.JOB_NUMBER_LIMIT;
-import static org.apache.inlong.agent.constant.JobConstants.JOB_ID;
-import static org.apache.inlong.agent.constant.JobConstants.JOB_ID_PREFIX;
-import static org.apache.inlong.agent.constant.JobConstants.JOB_INSTANCE_ID;
-import static org.apache.inlong.agent.constant.JobConstants.SQL_JOB_ID;
-import static
org.apache.inlong.agent.metrics.AgentMetricItem.KEY_COMPONENT_NAME;
-
/**
* JobManager maintains lots of jobs, and communicate between server and task
manager.
*/
@@ -143,7 +143,7 @@ public class JobManager extends AbstractDaemon {
* @param profile job profile.
*/
public boolean submitFileJobProfile(JobProfile profile) {
- return submitJobProfile(profile, false);
+ return submitJobProfile(profile, false, true);
}
/**
@@ -151,7 +151,7 @@ public class JobManager extends AbstractDaemon {
*
* @param profile job profile.
*/
- public boolean submitJobProfile(JobProfile profile, boolean singleJob) {
+ public boolean submitJobProfile(JobProfile profile, boolean singleJob,
boolean isNewJob) {
if (!isJobValid(profile)) {
return false;
}
@@ -161,8 +161,19 @@ public class JobManager extends AbstractDaemon {
} else {
profile.set(JOB_INSTANCE_ID, AgentUtils.getUniqId(JOB_ID_PREFIX,
jobId, index.incrementAndGet()));
}
- LOGGER.info("submit job profile {}", profile.toJsonStr());
- getJobConfDb().storeJobFirstTime(profile);
+ LOGGER.info("submit job profile {} isNewJob {}", profile.toJsonStr(),
isNewJob);
+ if (isNewJob) {
+ jobProfileDb.storeJobFirstTime(profile);
+ } else {
+ JobProfile jobFromDb =
jobProfileDb.getJobById(profile.getInstanceId());
+ if (jobFromDb != null) {
+ jobFromDb.set(JOB_VERSION, profile.get(JOB_VERSION));
+ profile = jobFromDb;
+ } else {
+ LOGGER.info("submit job final profile null");
+ }
+ }
+ LOGGER.info("submit job final profile {}", profile.toJsonStr());
addJob(new Job(profile));
return true;
}
@@ -192,13 +203,15 @@ public class JobManager extends AbstractDaemon {
*
* @param jobInstancId
*/
- public boolean deleteJob(String jobInstancId) {
+ public boolean deleteJob(String jobInstancId, boolean isFrozen) {
LOGGER.info("start to delete job, job id set {}", jobs.keySet());
JobWrapper jobWrapper = jobs.remove(jobInstancId);
if (jobWrapper != null) {
- LOGGER.info("delete job instance with job id {}", jobInstancId);
+ LOGGER.info("delete job instance with job id {} isFrozen {}",
jobInstancId, isFrozen);
jobWrapper.cleanup();
- getJobConfDb().deleteJob(jobInstancId);
+ if (!isFrozen) {
+ jobProfileDb.deleteJob(jobInstancId);
+ }
return true;
}
return true;
@@ -208,7 +221,7 @@ public class JobManager extends AbstractDaemon {
* start all accepted jobs.
*/
private void startJobs() {
- List<JobProfile> profileList = getJobConfDb().getRestartJobs();
+ List<JobProfile> profileList = jobProfileDb.getRestartJobs();
for (JobProfile profile : profileList) {
LOGGER.info("init starting job from db {}", profile.toJsonStr());
addJob(new Job(profile));
@@ -283,6 +296,7 @@ public class JobManager extends AbstractDaemon {
* @param jobId job id
*/
public void markJobAsFailed(String jobId) {
+ LOGGER.info("markJobAsFailed {}", jobId);
JobWrapper wrapper = jobs.remove(jobId);
if (wrapper != null) {
LOGGER.info("job instance {} is failed", jobId);
diff --git
a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/trigger/TriggerManager.java
b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/trigger/TriggerManager.java
index e73f6b8d6..0f8c4a077 100755
---
a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/trigger/TriggerManager.java
+++
b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/trigger/TriggerManager.java
@@ -17,7 +17,15 @@
package org.apache.inlong.agent.core.trigger;
+import static
org.apache.inlong.agent.constant.AgentConstants.DEFAULT_TRIGGER_MAX_RUNNING_NUM;
+import static
org.apache.inlong.agent.constant.AgentConstants.TRIGGER_MAX_RUNNING_NUM;
+import static org.apache.inlong.agent.constant.JobConstants.JOB_ID;
+
import com.google.common.base.Preconditions;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
import org.apache.commons.lang3.StringUtils;
import org.apache.inlong.agent.common.AbstractDaemon;
import org.apache.inlong.agent.conf.AgentConfiguration;
@@ -33,15 +41,6 @@ import org.apache.inlong.agent.utils.ThreadUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.TimeUnit;
-
-import static
org.apache.inlong.agent.constant.AgentConstants.DEFAULT_TRIGGER_MAX_RUNNING_NUM;
-import static
org.apache.inlong.agent.constant.AgentConstants.TRIGGER_MAX_RUNNING_NUM;
-import static org.apache.inlong.agent.constant.JobConstants.JOB_ID;
-
/**
* manager for triggers.
*/
@@ -76,7 +75,7 @@ public class TriggerManager extends AbstractDaemon {
Trigger trigger = (Trigger) triggerClass.newInstance();
String triggerId = triggerProfile.get(JOB_ID);
if (triggerMap.containsKey(triggerId)) {
- deleteTrigger(triggerId);
+ deleteTrigger(triggerId, false);
LOGGER.warn("trigger {} is running, stop it", triggerId);
}
triggerMap.put(triggerId, trigger);
@@ -101,7 +100,7 @@ public class TriggerManager extends AbstractDaemon {
*
* @param triggerProfile trigger profile
*/
- public void submitTrigger(TriggerProfile triggerProfile) {
+ public void submitTrigger(TriggerProfile triggerProfile, boolean isNewJob)
{
// make sure all required key exists.
if (!triggerProfile.allRequiredKeyExist() || this.triggerMap.size() >
maxRunningNum) {
throw new IllegalArgumentException(
@@ -117,7 +116,7 @@ public class TriggerManager extends AbstractDaemon {
LOGGER.info("submit trigger {}", triggerProfile.toJsonStr());
// This action must be done before saving in db, because the
job.instance.id is needed for the next recovery
- manager.getJobManager().submitJobProfile(triggerProfile, true);
+ manager.getJobManager().submitJobProfile(triggerProfile, true,
isNewJob);
triggerProfileDB.storeTrigger(triggerProfile);
restoreTrigger(triggerProfile);
}
@@ -129,7 +128,7 @@ public class TriggerManager extends AbstractDaemon {
*
* @param triggerId trigger profile.
*/
- public void deleteTrigger(String triggerId) {
+ public void deleteTrigger(String triggerId, boolean isFrozen) {
// repeat check
if (!triggerProfileDB.getTriggers().stream()
.anyMatch(profile ->
profile.getTriggerId().equals(triggerId))) {
@@ -139,7 +138,7 @@ public class TriggerManager extends AbstractDaemon {
LOGGER.info("delete trigger {}", triggerId);
Trigger trigger = triggerMap.remove(triggerId);
if (trigger != null) {
-
manager.getJobManager().deleteJob(trigger.getTriggerProfile().getInstanceId());
+
manager.getJobManager().deleteJob(trigger.getTriggerProfile().getInstanceId(),
isFrozen);
trigger.destroy();
}
triggerProfileDB.deleteTrigger(triggerId);
@@ -157,6 +156,10 @@ public class TriggerManager extends AbstractDaemon {
if (profile != null) {
Map<String, JobWrapper> jobWrapperMap =
manager.getJobManager().getJobs();
JobWrapper job =
jobWrapperMap.get(trigger.getTriggerProfile().getInstanceId());
+ if (job == null) {
+ LOGGER.error("job {} should not be null",
trigger.getTriggerProfile().getInstanceId());
+ return;
+ }
String subTaskFile =
profile.getOrDefault(JobConstants.JOB_DIR_FILTER_PATTERNS, "");
Preconditions.checkArgument(StringUtils.isNotBlank(subTaskFile),
String.format("Trigger %s fetched task
file should not be null.", s));
diff --git
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/fetcher/ManagerFetcher.java
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/fetcher/ManagerFetcher.java
index c8f2168c7..44ec85ad5 100755
---
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/fetcher/ManagerFetcher.java
+++
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/fetcher/ManagerFetcher.java
@@ -17,55 +17,9 @@
package org.apache.inlong.agent.plugin.fetcher;
-import com.google.gson.Gson;
-import com.google.gson.GsonBuilder;
-import com.google.gson.JsonArray;
-import com.google.gson.JsonElement;
-import com.google.gson.JsonObject;
-import org.apache.inlong.agent.cache.LocalFileCache;
-import org.apache.inlong.agent.common.AbstractDaemon;
-import org.apache.inlong.agent.conf.AgentConfiguration;
-import org.apache.inlong.agent.conf.JobProfile;
-import org.apache.inlong.agent.conf.ProfileFetcher;
-import org.apache.inlong.agent.conf.TriggerProfile;
-import org.apache.inlong.agent.core.AgentManager;
-import org.apache.inlong.agent.db.CommandDb;
-import org.apache.inlong.agent.plugin.Trigger;
-import org.apache.inlong.agent.plugin.utils.PluginUtils;
-import org.apache.inlong.agent.pojo.ConfirmAgentIpRequest;
-import org.apache.inlong.agent.pojo.DbCollectorTaskRequestDto;
-import org.apache.inlong.agent.pojo.DbCollectorTaskResult;
-import org.apache.inlong.agent.utils.AgentUtils;
-import org.apache.inlong.agent.utils.HttpManager;
-import org.apache.inlong.agent.utils.ThreadUtils;
-import org.apache.inlong.common.db.CommandEntity;
-import org.apache.inlong.common.enums.ManagerOpEnum;
-import org.apache.inlong.common.enums.PullJobTypeEnum;
-import org.apache.inlong.common.pojo.agent.CmdConfig;
-import org.apache.inlong.common.pojo.agent.TaskRequest;
-import org.apache.inlong.common.pojo.agent.TaskResult;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.File;
-import java.nio.file.Path;
-import java.nio.file.Paths;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.List;
-import java.util.concurrent.TimeUnit;
-import java.util.stream.Collectors;
-
import static java.util.Objects.requireNonNull;
import static
org.apache.inlong.agent.constant.AgentConstants.AGENT_CLUSTER_NAME;
-import static org.apache.inlong.agent.constant.AgentConstants.AGENT_HOME;
-import static
org.apache.inlong.agent.constant.AgentConstants.AGENT_LOCAL_CACHE;
-import static
org.apache.inlong.agent.constant.AgentConstants.AGENT_LOCAL_CACHE_TIMEOUT;
import static org.apache.inlong.agent.constant.AgentConstants.AGENT_UNIQ_ID;
-import static
org.apache.inlong.agent.constant.AgentConstants.DEFAULT_AGENT_HOME;
-import static
org.apache.inlong.agent.constant.AgentConstants.DEFAULT_AGENT_LOCAL_CACHE;
-import static
org.apache.inlong.agent.constant.AgentConstants.DEFAULT_AGENT_LOCAL_CACHE_TIMEOUT;
import static
org.apache.inlong.agent.constant.AgentConstants.DEFAULT_AGENT_UNIQ_ID;
import static
org.apache.inlong.agent.constant.FetcherConstants.AGENT_FETCHER_INTERVAL;
import static
org.apache.inlong.agent.constant.FetcherConstants.AGENT_MANAGER_DBCOLLECT_GETTASK_HTTP_PATH;
@@ -92,6 +46,43 @@ import static
org.apache.inlong.agent.plugin.utils.PluginUtils.copyJobProfile;
import static org.apache.inlong.agent.utils.AgentUtils.fetchLocalIp;
import static org.apache.inlong.agent.utils.AgentUtils.fetchLocalUuid;
+import com.google.gson.Gson;
+import com.google.gson.GsonBuilder;
+import com.google.gson.JsonArray;
+import com.google.gson.JsonElement;
+import com.google.gson.JsonObject;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+import org.apache.inlong.agent.common.AbstractDaemon;
+import org.apache.inlong.agent.conf.AgentConfiguration;
+import org.apache.inlong.agent.conf.JobProfile;
+import org.apache.inlong.agent.conf.ProfileFetcher;
+import org.apache.inlong.agent.conf.TriggerProfile;
+import org.apache.inlong.agent.core.AgentManager;
+import org.apache.inlong.agent.db.CommandDb;
+import org.apache.inlong.agent.plugin.Trigger;
+import org.apache.inlong.agent.plugin.utils.PluginUtils;
+import org.apache.inlong.agent.pojo.ConfirmAgentIpRequest;
+import org.apache.inlong.agent.pojo.DbCollectorTaskRequestDto;
+import org.apache.inlong.agent.pojo.DbCollectorTaskResult;
+import org.apache.inlong.agent.utils.AgentUtils;
+import org.apache.inlong.agent.utils.HttpManager;
+import org.apache.inlong.agent.utils.ThreadUtils;
+import org.apache.inlong.common.db.CommandEntity;
+import org.apache.inlong.common.enums.ManagerOpEnum;
+import org.apache.inlong.common.enums.PullJobTypeEnum;
+import org.apache.inlong.common.pojo.agent.CmdConfig;
+import org.apache.inlong.common.pojo.agent.TaskRequest;
+import org.apache.inlong.common.pojo.agent.TaskResult;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
/**
* Fetch command from Inlong-Manager
*/
@@ -108,11 +99,9 @@ public class ManagerFetcher extends AbstractDaemon
implements ProfileFetcher {
private final String managerIpsCheckUrl;
private final String managerDbCollectorTaskUrl;
private final AgentConfiguration conf;
- private final LocalFileCache localFileCache;
private final String uniqId;
private final AgentManager agentManager;
private final HttpManager httpManager;
- private List<String> managerList;
private String localIp;
private String uuid;
private String clusterName;
@@ -129,7 +118,6 @@ public class ManagerFetcher extends AbstractDaemon
implements ProfileFetcher {
managerTaskUrl = buildFileCollectTaskUrl(baseManagerUrl);
managerIpsCheckUrl = buildIpCheckUrl(baseManagerUrl);
managerDbCollectorTaskUrl =
buildDbCollectorGetTaskUrl(baseManagerUrl);
- localFileCache = getLocalFileCache();
uniqId = conf.get(AGENT_UNIQ_ID, DEFAULT_AGENT_UNIQ_ID);
clusterName = conf.get(AGENT_CLUSTER_NAME);
this.commandDb = agentManager.getCommandDb();
@@ -144,7 +132,7 @@ public class ManagerFetcher extends AbstractDaemon
implements ProfileFetcher {
/**
* build base url for manager according to config
- *
+ * <p>
* example - http://127.0.0.1:8080/inlong/manager/openapi
*/
private String buildBaseUrl() {
@@ -155,7 +143,7 @@ public class ManagerFetcher extends AbstractDaemon
implements ProfileFetcher {
/**
* build vip url for manager according to config
- *
+ * <p>
* example -
http://127.0.0.1:8080/inlong/manager/openapi/agent/getManagerIpList
*/
private String buildVipUrl(String baseUrl) {
@@ -164,7 +152,7 @@ public class ManagerFetcher extends AbstractDaemon
implements ProfileFetcher {
/**
* build file collect task url for manager according to config
- *
+ * <p>
* example -
http://127.0.0.1:8080/inlong/manager/openapi/fileAgent/getTaskConf
*/
private String buildFileCollectTaskUrl(String baseUrl) {
@@ -173,7 +161,7 @@ public class ManagerFetcher extends AbstractDaemon
implements ProfileFetcher {
/**
* build ip check url for manager according to config
- *
+ * <p>
* example -
http://127.0.0.1:8080/inlong/manager/openapi/fileAgent/confirmAgentIp
*/
private String buildIpCheckUrl(String baseUrl) {
@@ -182,7 +170,7 @@ public class ManagerFetcher extends AbstractDaemon
implements ProfileFetcher {
/**
* build db collector get task url for manager according to config
- *
+ * <p>
* example -
http://127.0.0.1:8080/inlong/manager/openapi/dbcollector/getTask
*/
private String buildDbCollectorGetTaskUrl(String baseUrl) {
@@ -190,17 +178,6 @@ public class ManagerFetcher extends AbstractDaemon
implements ProfileFetcher {
.get(AGENT_MANAGER_DBCOLLECT_GETTASK_HTTP_PATH,
DEFAULT_AGENT_MANAGER_DBCOLLECTOR_GETTASK_HTTP_PATH);
}
- /**
- * get localFileCache according to config
- */
- private LocalFileCache getLocalFileCache() {
- Path localStorage = Paths.get(conf.get(AGENT_HOME, DEFAULT_AGENT_HOME),
- conf.get(AGENT_LOCAL_CACHE, DEFAULT_AGENT_LOCAL_CACHE),
"managerList.txt");
- long timeout =
TimeUnit.MINUTES.toMillis(conf.getInt(AGENT_LOCAL_CACHE_TIMEOUT,
- DEFAULT_AGENT_LOCAL_CACHE_TIMEOUT));
- return new LocalFileCache(localStorage.toFile(), timeout);
- }
-
/**
* for manager to get job profiles
*
@@ -215,7 +192,7 @@ public class ManagerFetcher extends AbstractDaemon
implements ProfileFetcher {
/**
* request manager to get manager vipUrl list, and store it to local file
*/
- public void requestTdmList() {
+ public List<String> requestTdmList() {
JsonObject result =
getResultData(httpManager.doSendPost(managerVipUrl));
JsonArray data =
result.get(AGENT_MANAGER_RETURN_PARAM_DATA).getAsJsonArray();
List<String> managerIpList = new ArrayList<>();
@@ -223,24 +200,26 @@ public class ManagerFetcher extends AbstractDaemon
implements ProfileFetcher {
JsonObject asJsonObject = datum.getAsJsonObject();
managerIpList.add(asJsonObject.get(AGENT_MANAGER_RETURN_PARAM_IP).getAsString());
}
- if (managerIpList.isEmpty()) {
- return;
- }
- localFileCache.writeToCache(String.join(",", managerIpList));
+ return managerIpList;
}
/**
* request manager to get commands, make sure it is not throwing exceptions
*/
public void fetchCommand() {
+ LOGGER.info("fetchCommand start");
List<CommandEntity> unackedCommands = commandDb.getUnackedCommands();
String resultStr = httpManager.doSentPost(managerTaskUrl,
getFetchRequest(unackedCommands));
JsonObject resultData = getResultData(resultStr);
JsonElement element = resultData.get(AGENT_MANAGER_RETURN_PARAM_DATA);
if (element != null) {
+ LOGGER.info("fetchCommand not null {}", resultData);
dealWithFetchResult(GSON.fromJson(element.getAsJsonObject(),
TaskResult.class));
+ } else {
+ LOGGER.info("fetchCommand nothing to do");
}
ackCommands(unackedCommands);
+ LOGGER.info("fetchCommand end");
}
private void ackCommands(List<CommandEntity> unackedCommands) {
@@ -274,7 +253,7 @@ public class ManagerFetcher extends AbstractDaemon
implements ProfileFetcher {
if (profile == null) {
return;
}
- agentManager.getJobManager().submitJobProfile(profile, true);
+ agentManager.getJobManager().submitJobProfile(profile, true, true);
}
/**
@@ -404,14 +383,19 @@ public class ManagerFetcher extends AbstractDaemon
implements ProfileFetcher {
try {
switch (requireNonNull(opType)) {
case ACTIVE:
+
agentManager.getTriggerManager().submitTrigger(triggerProfile, false);
+ break;
case ADD:
-
agentManager.getTriggerManager().submitTrigger(triggerProfile);
+
agentManager.getTriggerManager().submitTrigger(triggerProfile, true);
break;
case DEL:
+
agentManager.getTriggerManager().deleteTrigger(triggerProfile.getTriggerId(),
false);
+ break;
case FROZEN:
-
agentManager.getTriggerManager().deleteTrigger(triggerProfile.getTriggerId());
+
agentManager.getTriggerManager().deleteTrigger(triggerProfile.getTriggerId(),
true);
break;
default:
+ LOGGER.error("can not handle option type {}", opType);
}
} catch (Exception e) {
LOGGER.error("Deal with trigger profile err.", e);
@@ -429,12 +413,16 @@ public class ManagerFetcher extends AbstractDaemon
implements ProfileFetcher {
try {
switch (requireNonNull(opType)) {
case ACTIVE:
+ success =
agentManager.getJobManager().submitJobProfile(triggerProfile, true, false);
+ break;
case ADD:
- success =
agentManager.getJobManager().submitJobProfile(triggerProfile, true);
+ success =
agentManager.getJobManager().submitJobProfile(triggerProfile, true, true);
break;
case DEL:
+ success =
agentManager.getJobManager().deleteJob(triggerProfile.getTriggerId(), false);
+ break;
case FROZEN:
- success =
agentManager.getJobManager().deleteJob(triggerProfile.getTriggerId());
+ success =
agentManager.getJobManager().deleteJob(triggerProfile.getTriggerId(), true);
break;
default:
}
@@ -458,31 +446,6 @@ public class ManagerFetcher extends AbstractDaemon
implements ProfileFetcher {
return resultData.get(AGENT_MANAGER_RETURN_PARAM_IP).getAsString();
}
- /**
- * fetch manager list, make sure it's not throwing exceptions
- *
- * @param isInitial is initial
- * @param retryTime retry time
- */
- private void fetchTdmList(boolean isInitial, int retryTime) {
- if (retryTime > MAX_RETRY) {
- return;
- }
- try {
- // check local cache time, make sure cache not timeout
- if (!isInitial && !localFileCache.cacheIsExpired()) {
- String result = localFileCache.getCacheInfo();
- managerList = Arrays.stream(result.split(","))
- .map(String::trim)
- .collect(Collectors.toList());
- } else {
- requestTdmList();
- }
- } catch (Exception ex) {
- fetchTdmList(false, retryTime + 1);
- }
- }
-
/**
* thread for profile fetcher.
*
@@ -498,10 +461,6 @@ public class ManagerFetcher extends AbstractDaemon
implements ProfileFetcher {
TimeUnit.SECONDS.sleep(AgentUtils.getRandomBySeed(configSleepTime));
// fetch commands from manager
fetchCommand();
-
- // fetch manager list from vip
- fetchTdmList(false, 0);
-
// fetch db collector task from manager
fetchDbCollectTask();
} catch (Throwable ex) {
@@ -527,7 +486,6 @@ public class ManagerFetcher extends AbstractDaemon
implements ProfileFetcher {
// when agent start, check local ip and fetch manager ip list;
localIp = fetchLocalIp();
uuid = fetchLocalUuid();
- fetchTdmList(true, 0);
submitWorker(profileFetchThread());
}
diff --git
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/ProxySink.java
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/ProxySink.java
index 090ab2c33..04f51f96f 100755
---
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/ProxySink.java
+++
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/ProxySink.java
@@ -17,6 +17,16 @@
package org.apache.inlong.agent.plugin.sinks;
+import static
org.apache.inlong.agent.constant.CommonConstants.DEFAULT_FIELD_SPLITTER;
+
+import java.nio.charset.StandardCharsets;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+
import org.apache.inlong.agent.common.AgentThreadFactory;
import org.apache.inlong.agent.conf.JobProfile;
import org.apache.inlong.agent.constant.CommonConstants;
@@ -31,15 +41,6 @@ import org.apache.inlong.agent.utils.ThreadUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.nio.charset.StandardCharsets;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicLong;
-
-import static
org.apache.inlong.agent.constant.CommonConstants.DEFAULT_FIELD_SPLITTER;
-
/**
* sink message data to inlong-dataproxy
*/
@@ -60,36 +61,58 @@ public class ProxySink extends AbstractSink {
@Override
public void write(Message message) {
+ boolean suc = false;
+ while (!suc) {
+ suc = putInCache(message);
+ if (!suc) {
+ AgentUtils.silenceSleepInMs(batchFlushInterval);
+ }
+ }
+ }
+
+ private boolean putInCache(Message message) {
try {
- if (message != null) {
+ if (message == null) {
+ return true;
+ }
+ message.getHeader().put(CommonConstants.PROXY_KEY_GROUP_ID,
inlongGroupId);
+ message.getHeader().put(CommonConstants.PROXY_KEY_STREAM_ID,
inlongStreamId);
+ extractStreamFromMessage(message, fieldSplitter);
+ if (message instanceof EndMessage) {
+ // increment the count of failed sinks
+ sinkMetric.sinkFailCount.incrementAndGet();
+ return true;
+ }
+ AtomicBoolean suc = new AtomicBoolean(false);
+ ProxyMessage proxyMessage = new ProxyMessage(message);
+ // add proxy message to cache.
+ cache.compute(proxyMessage.getBatchKey(),
+ (s, packProxyMessage) -> {
+ if (packProxyMessage == null) {
+ packProxyMessage = new
PackProxyMessage(jobInstanceId, jobConf, inlongGroupId,
+ proxyMessage.getInlongStreamId());
+
packProxyMessage.generateExtraMap(proxyMessage.getDataKey());
+ }
+ // add message to package proxy
+
suc.set(packProxyMessage.addProxyMessage(proxyMessage));
+ return packProxyMessage;
+ });
+ if (suc.get()) {
+ // semaphore should be acquired only when the message was put
in cache successfully
senderManager.acquireSemaphore(1);
- message.getHeader().put(CommonConstants.PROXY_KEY_GROUP_ID,
inlongGroupId);
- message.getHeader().put(CommonConstants.PROXY_KEY_STREAM_ID,
inlongStreamId);
- extractStreamFromMessage(message, fieldSplitter);
- if (!(message instanceof EndMessage)) {
- ProxyMessage proxyMessage = new ProxyMessage(message);
- // add proxy message to cache.
- cache.compute(proxyMessage.getBatchKey(),
- (s, packProxyMessage) -> {
- if (packProxyMessage == null) {
- packProxyMessage = new
PackProxyMessage(jobInstanceId, jobConf, inlongGroupId,
- proxyMessage.getInlongStreamId());
-
packProxyMessage.generateExtraMap(proxyMessage.getDataKey());
- }
- // add message to package proxy
- packProxyMessage.addProxyMessage(proxyMessage);
- return packProxyMessage;
- });
- // increment the count of successful sinks
- sinkMetric.sinkSuccessCount.incrementAndGet();
- }
+ // increment the count of successful sinks
+ sinkMetric.sinkSuccessCount.incrementAndGet();
+ } else {
+ // increment the count of failed sinks
+ sinkMetric.sinkFailCount.incrementAndGet();
}
+ return suc.get();
} catch (Exception e) {
- sinkMetric.sinkFailCount.incrementAndGet();
LOGGER.error("write message to Proxy sink error", e);
} catch (Throwable t) {
ThreadUtils.threadThrowableHandler(Thread.currentThread(), t);
}
+ return false;
}
/**
@@ -125,11 +148,12 @@ public class ProxySink extends AbstractSink {
}
});
- AgentUtils.silenceSleepInMs(batchFlushInterval);
} catch (Exception ex) {
LOGGER.error("error caught", ex);
} catch (Throwable t) {
ThreadUtils.threadThrowableHandler(Thread.currentThread(),
t);
+ } finally {
+ AgentUtils.silenceSleepInMs(batchFlushInterval);
}
}
};
@@ -145,6 +169,7 @@ public class ProxySink extends AbstractSink {
senderManager = new SenderManager(jobConf, inlongGroupId, sourceName);
try {
senderManager.addMessageSender();
+ senderManager.Start();
} catch (Throwable ex) {
LOGGER.error("error while init sender for group id {}",
inlongGroupId);
ThreadUtils.threadThrowableHandler(Thread.currentThread(), ex);
@@ -161,6 +186,7 @@ public class ProxySink extends AbstractSink {
}
shutdown = true;
executorService.shutdown();
+ senderManager.Stop();
}
/**
diff --git
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/PulsarSink.java
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/PulsarSink.java
index bf4aa6f50..599a7c46b 100644
---
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/PulsarSink.java
+++
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/PulsarSink.java
@@ -234,11 +234,12 @@ public class PulsarSink extends AbstractSink {
}
}
});
- AgentUtils.silenceSleepInMs(batchFlushInterval);
} catch (Exception ex) {
LOGGER.error("error caught", ex);
} catch (Throwable t) {
ThreadUtils.threadThrowableHandler(Thread.currentThread(),
t);
+ } finally {
+ AgentUtils.silenceSleepInMs(batchFlushInterval);
}
}
};
diff --git
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/TextFileSource.java
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/TextFileSource.java
index 032deebb3..cf2725035 100755
---
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/TextFileSource.java
+++
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/TextFileSource.java
@@ -18,8 +18,6 @@
package org.apache.inlong.agent.plugin.sources;
import org.apache.inlong.agent.conf.JobProfile;
-import org.apache.inlong.agent.constant.DataCollectType;
-import org.apache.inlong.agent.constant.JobConstants;
import org.apache.inlong.agent.plugin.Reader;
import org.apache.inlong.agent.plugin.sources.reader.file.FileReaderOperator;
import org.apache.inlong.agent.plugin.sources.reader.file.TriggerFileReader;
@@ -29,9 +27,6 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
-import java.io.FileReader;
-import java.io.IOException;
-import java.io.LineNumberReader;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
@@ -84,17 +79,6 @@ public class TextFileSource extends AbstractSource {
private int getStartPosition(JobProfile jobConf, File file) {
int seekPosition;
- if (jobConf.hasKey(JobConstants.JOB_FILE_CONTENT_COLLECT_TYPE) &&
DataCollectType.INCREMENT
-
.equalsIgnoreCase(jobConf.get(JobConstants.JOB_FILE_CONTENT_COLLECT_TYPE))) {
- try (LineNumberReader lineNumberReader = new LineNumberReader(new
FileReader(file.getPath()))) {
- lineNumberReader.skip(Long.MAX_VALUE);
- seekPosition = lineNumberReader.getLineNumber();
- return seekPosition;
- } catch (IOException ex) {
- LOGGER.error("get position error, file absolute path: {}",
file.getAbsolutePath());
- throw new RuntimeException(ex);
- }
- }
seekPosition = jobConf.getInt(file.getAbsolutePath() +
POSITION_SUFFIX, 0);
return seekPosition;
}
diff --git
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/file/MonitorTextFile.java
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/file/MonitorTextFile.java
index 6fcbf481c..526c17d27 100644
---
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/file/MonitorTextFile.java
+++
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/file/MonitorTextFile.java
@@ -42,7 +42,9 @@ import static
org.apache.inlong.agent.constant.JobConstants.JOB_FILE_MONITOR_INT
public final class MonitorTextFile {
private static final Logger LOGGER =
LoggerFactory.getLogger(MonitorTextFile.class);
- // monitor thread pool
+ /**
+ * monitor thread pool
+ */
private static final ThreadPoolExecutor EXECUTOR_SERVICE = new
ThreadPoolExecutor(
0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
@@ -165,7 +167,7 @@ public final class MonitorTextFile {
}
/**
- * reset the position and bytePositionreset the position and
bytePosition
+ * Reset the position and bytePosition
*/
private void resetPosition() {
LOGGER.info("reset position {}", fileReaderOperator.file.toPath());
@@ -182,18 +184,15 @@ public final class MonitorTextFile {
/**
* Determine whether the inode has changed
*
- * @param currentFileKey
- * @return
+ * @param currentFileKey current file key
+ * @return true if the inode changed, otherwise false
*/
private boolean isInodeChanged(String currentFileKey) {
if (fileReaderOperator.fileKey == null || currentFileKey == null) {
return false;
}
- if (fileReaderOperator.fileKey.equals(currentFileKey)) {
- return false;
- }
- return true;
+ return !fileReaderOperator.fileKey.equals(currentFileKey);
}
}
}
diff --git
a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/MiniAgent.java
b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/MiniAgent.java
index 77c2efadc..4556a1474 100755
---
a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/MiniAgent.java
+++
b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/MiniAgent.java
@@ -93,18 +93,19 @@ public class MiniAgent {
}
public void submitTrigger(TriggerProfile triggerProfile) {
- manager.getTriggerManager().submitTrigger(triggerProfile);
+ manager.getTriggerManager().submitTrigger(triggerProfile, true);
triggerProfileCache.add(triggerProfile);
}
public void cleanupJobs() {
- jobProfileCache.forEach(jobProfile ->
manager.getJobManager().deleteJob(jobProfile.getInstanceId()));
+ jobProfileCache.forEach(jobProfile ->
manager.getJobManager().deleteJob(jobProfile.getInstanceId(), false));
jobProfileCache.clear();
}
public void cleanupTriggers() {
triggerProfileCache
- .forEach(triggerProfile ->
manager.getTriggerManager().deleteTrigger(triggerProfile.getTriggerId()));
+ .forEach(triggerProfile ->
manager.getTriggerManager().deleteTrigger(triggerProfile.getTriggerId(),
+ false));
triggerProfileCache.clear();
}
}
diff --git
a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/trigger/TestTriggerManager.java
b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/trigger/TestTriggerManager.java
index fc5cd75e2..3d41ef0a4 100644
---
a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/trigger/TestTriggerManager.java
+++
b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/trigger/TestTriggerManager.java
@@ -163,7 +163,7 @@ public class TestTriggerManager {
});
// shutdown trigger
-
agent.getManager().getTriggerManager().deleteTrigger(triggerProfile1.getTriggerId());
+
agent.getManager().getTriggerManager().deleteTrigger(triggerProfile1.getTriggerId(),
false);
await().atMost(10, TimeUnit.SECONDS).until(() ->
trigger.getWatchers().size() == 0);
TestUtils.deleteFile(WATCH_FOLDER.getRoot().getAbsolutePath() +
"/1.log");
}