This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 88770263f6 [INLONG-8645][Agent] Delete the capacity of loading trigger
from local file (#8646)
88770263f6 is described below
commit 88770263f66ba7f2bd68dc6ad9044a6541db4ade
Author: justinwwhuang <[email protected]>
AuthorDate: Mon Aug 7 17:13:28 2023 +0800
[INLONG-8645][Agent] Delete the capacity of loading trigger from local file
(#8646)
---
.../apache/inlong/agent/constant/JobConstants.java | 2 +-
.../org/apache/inlong/agent/core/AgentManager.java | 25 ----------------------
.../apache/inlong/agent/core/conf/ConfigJetty.java | 6 +++---
.../inlong/agent/core/trigger/TriggerManager.java | 4 ++--
.../agent/plugin/fetcher/ManagerFetcher.java | 8 +++----
.../agent/plugin/sources/TextFileSource.java | 4 ++--
.../sources/reader/file/TriggerFileReader.java | 2 +-
.../agent/plugin/trigger/DirectoryTrigger.java | 2 +-
.../inlong/agent/plugin/utils/PluginUtils.java | 2 +-
9 files changed, 15 insertions(+), 40 deletions(-)
diff --git
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/JobConstants.java
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/JobConstants.java
index 79c1fd4a30..92ac0529f4 100755
---
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/JobConstants.java
+++
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/JobConstants.java
@@ -51,7 +51,7 @@ public class JobConstants extends CommonConstants {
public static final String JOB_MQ_TOPIC = "job.topicInfo";
// File job
- public static final String JOB_TRIGGER = "job.fileJob.trigger";
+ public static final String JOB_FILE_JOB_TRIGGER = "job.fileJob.trigger";
public static final String JOB_DIR_FILTER_PATTERN =
"job.fileJob.dir.pattern"; // deprecated
public static final String JOB_DIR_FILTER_PATTERNS =
"job.fileJob.dir.patterns";
public static final String JOB_DIR_FILTER_BLACKLIST =
"job.fileJob.dir.blackList";
diff --git
a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/AgentManager.java
b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/AgentManager.java
index 11f9ce793e..c3e2344b27 100755
---
a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/AgentManager.java
+++
b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/AgentManager.java
@@ -19,9 +19,7 @@ package org.apache.inlong.agent.core;
import org.apache.inlong.agent.common.AbstractDaemon;
import org.apache.inlong.agent.conf.AgentConfiguration;
-import org.apache.inlong.agent.conf.JobProfile;
import org.apache.inlong.agent.conf.ProfileFetcher;
-import org.apache.inlong.agent.conf.TriggerProfile;
import org.apache.inlong.agent.constant.AgentConstants;
import org.apache.inlong.agent.core.conf.ConfigJetty;
import org.apache.inlong.agent.core.job.JobManager;
@@ -31,7 +29,6 @@ import org.apache.inlong.agent.core.trigger.TriggerManager;
import org.apache.inlong.agent.db.CommandDb;
import org.apache.inlong.agent.db.Db;
import org.apache.inlong.agent.db.JobProfileDb;
-import org.apache.inlong.agent.db.LocalProfile;
import org.apache.inlong.agent.db.TriggerProfileDb;
import org.slf4j.Logger;
@@ -39,14 +36,9 @@ import org.slf4j.LoggerFactory;
import java.io.File;
import java.lang.reflect.Constructor;
-import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
-import static
org.apache.inlong.agent.constant.AgentConstants.AGENT_CONF_PARENT;
-import static
org.apache.inlong.agent.constant.AgentConstants.DEFAULT_AGENT_CONF_PARENT;
-import static org.apache.inlong.agent.constant.JobConstants.JOB_TRIGGER;
-
/**
* Agent Manager, the bridge for job manager, task manager, db e.t.c it
manages agent level operations and communicates
* with outside system.
@@ -63,7 +55,6 @@ public class AgentManager extends AbstractDaemon {
private final AgentConfiguration conf;
private final ExecutorService agentConfMonitor;
private final Db db;
- private final LocalProfile localProfile;
private final CommandDb commandDb;
private final JobProfileDb jobProfileDb;
// jetty for config operations via http.
@@ -75,8 +66,6 @@ public class AgentManager extends AbstractDaemon {
this.db = initDb();
commandDb = new CommandDb(db);
jobProfileDb = new JobProfileDb(db);
- String parentConfPath = conf.get(AGENT_CONF_PARENT,
DEFAULT_AGENT_CONF_PARENT);
- localProfile = new LocalProfile(parentConfPath);
triggerManager = new TriggerManager(this, new TriggerProfileDb(db));
jobManager = new JobManager(this, jobProfileDb);
taskManager = new TaskManager(this);
@@ -208,20 +197,6 @@ public class AgentManager extends AbstractDaemon {
LOGGER.info("starting task position manager");
positionManager.start();
LOGGER.info("starting read job from local");
- // read job profiles from local
- List<JobProfile> profileList = localProfile.readFromLocal();
- for (JobProfile profile : profileList) {
- if (profile.hasKey(JOB_TRIGGER)) {
- TriggerProfile triggerProfile =
TriggerProfile.parseJobProfile(profile);
- // there is no need to store this profile in triggerDB, because
- // this profile comes from local file.
- triggerManager.restoreTrigger(triggerProfile);
- } else {
- // job db store instance info, so it's suitable to use
submitJobProfile
- // to store instance into job db.
- jobManager.submitFileJobProfile(profile);
- }
- }
LOGGER.info("starting fetcher");
if (fetcher != null) {
fetcher.start();
diff --git
a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/conf/ConfigJetty.java
b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/conf/ConfigJetty.java
index 34e40f8fff..b62413b032 100644
---
a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/conf/ConfigJetty.java
+++
b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/conf/ConfigJetty.java
@@ -35,8 +35,8 @@ import org.slf4j.LoggerFactory;
import java.io.Closeable;
+import static
org.apache.inlong.agent.constant.JobConstants.JOB_FILE_JOB_TRIGGER;
import static org.apache.inlong.agent.constant.JobConstants.JOB_SOURCE_TYPE;
-import static org.apache.inlong.agent.constant.JobConstants.JOB_TRIGGER;
/**
* start http server and get job/agent config via http
@@ -87,7 +87,7 @@ public class ConfigJetty implements Closeable {
// store job conf to bdb
if (jobProfile != null) {
// trigger job is a special kind of job
- if (jobProfile.hasKey(JOB_TRIGGER)) {
+ if (jobProfile.hasKey(JOB_FILE_JOB_TRIGGER)) {
triggerManager.submitTrigger(
TriggerProfile.parseJsonStr(jobProfile.toJsonStr()),
true);
} else {
@@ -123,7 +123,7 @@ public class ConfigJetty implements Closeable {
*/
public void deleteJobConf(JobProfile jobProfile) {
if (jobProfile != null) {
- if (jobProfile.hasKey(JOB_TRIGGER)) {
+ if (jobProfile.hasKey(JOB_FILE_JOB_TRIGGER)) {
triggerManager.deleteTrigger(TriggerProfile.parseJobProfile(jobProfile).getTriggerId(),
false);
} else {
jobManager.deleteJob(jobProfile.getInstanceId(), false);
diff --git
a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/trigger/TriggerManager.java
b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/trigger/TriggerManager.java
index c5c005b83f..122415a683 100755
---
a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/trigger/TriggerManager.java
+++
b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/trigger/TriggerManager.java
@@ -73,7 +73,7 @@ public class TriggerManager extends AbstractDaemon {
*/
public boolean restoreTrigger(TriggerProfile triggerProfile) {
try {
- Class<?> triggerClass =
Class.forName(triggerProfile.get(JobConstants.JOB_TRIGGER));
+ Class<?> triggerClass =
Class.forName(triggerProfile.get(JobConstants.JOB_FILE_JOB_TRIGGER));
Trigger trigger = (Trigger) triggerClass.newInstance();
String triggerId = triggerProfile.get(JOB_ID);
if (triggerMap.containsKey(triggerId)) {
@@ -171,7 +171,7 @@ public class TriggerManager extends AbstractDaemon {
// necessary to filter the stated monitored file
task.
boolean alreadyExistTask = job.exist(tasks ->
tasks.stream()
- .filter(task ->
!task.getJobConf().hasKey(JobConstants.JOB_TRIGGER))
+ .filter(task ->
!task.getJobConf().hasKey(JobConstants.JOB_FILE_JOB_TRIGGER))
.filter(task -> subTaskFile.equals(
task.getJobConf().get(JobConstants.JOB_DIR_FILTER_PATTERNS, "")))
.findAny().isPresent());
diff --git
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/fetcher/ManagerFetcher.java
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/fetcher/ManagerFetcher.java
index 350104e698..ac4939b147 100755
---
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/fetcher/ManagerFetcher.java
+++
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/fetcher/ManagerFetcher.java
@@ -75,9 +75,9 @@ import static
org.apache.inlong.agent.constant.FetcherConstants.DEFAULT_AGENT_MA
import static
org.apache.inlong.agent.constant.FetcherConstants.DEFAULT_AGENT_TDM_IP_CHECK_HTTP_PATH;
import static
org.apache.inlong.agent.constant.FetcherConstants.DEFAULT_AGENT_TDM_VIP_HTTP_PATH;
import static org.apache.inlong.agent.constant.FetcherConstants.VERSION;
+import static
org.apache.inlong.agent.constant.JobConstants.JOB_FILE_JOB_TRIGGER;
import static org.apache.inlong.agent.constant.JobConstants.JOB_OP;
import static org.apache.inlong.agent.constant.JobConstants.JOB_RETRY_TIME;
-import static org.apache.inlong.agent.constant.JobConstants.JOB_TRIGGER;
import static
org.apache.inlong.agent.plugin.fetcher.ManagerResultFormatter.getResultData;
import static org.apache.inlong.agent.plugin.utils.PluginUtils.copyJobProfile;
import static org.apache.inlong.agent.utils.AgentUtils.fetchLocalIp;
@@ -267,8 +267,8 @@ public class ManagerFetcher extends AbstractDaemon
implements ProfileFetcher {
.map(TriggerProfile::getTriggerProfiles)
.forEach(profile -> {
LOGGER.info("the triggerProfile: {}", profile.toJsonStr());
- if (profile.hasKey(JOB_TRIGGER)) {
- dealWithTdmTriggerProfile(profile);
+ if (profile.hasKey(JOB_FILE_JOB_TRIGGER)) {
+ dealWithFileTriggerProfile(profile);
} else {
dealWithJobProfile(profile);
}
@@ -377,7 +377,7 @@ public class ManagerFetcher extends AbstractDaemon
implements ProfileFetcher {
/**
* the trigger profile returned from manager should be parsed
*/
- public void dealWithTdmTriggerProfile(TriggerProfile triggerProfile) {
+ public void dealWithFileTriggerProfile(TriggerProfile triggerProfile) {
ManagerOpEnum opType =
ManagerOpEnum.getOpType(triggerProfile.getInt(JOB_OP));
boolean success = true;
try {
diff --git
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/TextFileSource.java
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/TextFileSource.java
index d680d49f0b..59fbc0b724 100755
---
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/TextFileSource.java
+++
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/TextFileSource.java
@@ -36,9 +36,9 @@ import java.util.List;
import static org.apache.inlong.agent.constant.CommonConstants.POSITION_SUFFIX;
import static
org.apache.inlong.agent.constant.JobConstants.DEFAULT_JOB_LINE_FILTER;
import static
org.apache.inlong.agent.constant.JobConstants.DEFAULT_JOB_READ_WAIT_TIMEOUT;
+import static
org.apache.inlong.agent.constant.JobConstants.JOB_FILE_JOB_TRIGGER;
import static
org.apache.inlong.agent.constant.JobConstants.JOB_LINE_FILTER_PATTERN;
import static
org.apache.inlong.agent.constant.JobConstants.JOB_READ_WAIT_TIMEOUT;
-import static org.apache.inlong.agent.constant.JobConstants.JOB_TRIGGER;
/**
* Read text files
@@ -53,7 +53,7 @@ public class TextFileSource extends AbstractSource {
@Override
public List<Reader> split(JobProfile jobConf) {
super.init(jobConf);
- if (jobConf.hasKey(JOB_TRIGGER)) {
+ if (jobConf.hasKey(JOB_FILE_JOB_TRIGGER)) {
// trigger as a special reader.
return Collections.singletonList(new TriggerFileReader());
}
diff --git
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/file/TriggerFileReader.java
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/file/TriggerFileReader.java
index f87de408cc..3639eaf95e 100644
---
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/file/TriggerFileReader.java
+++
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/file/TriggerFileReader.java
@@ -81,7 +81,7 @@ public class TriggerFileReader implements Reader {
@Override
public void init(JobProfile jobConf) {
- this.triggerId = jobConf.get(JobConstants.JOB_TRIGGER);
+ this.triggerId = jobConf.get(JobConstants.JOB_FILE_JOB_TRIGGER);
}
@Override
diff --git
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/trigger/DirectoryTrigger.java
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/trigger/DirectoryTrigger.java
index d78f8150e3..7ce8171c7f 100644
---
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/trigger/DirectoryTrigger.java
+++
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/trigger/DirectoryTrigger.java
@@ -295,7 +295,7 @@ public class DirectoryTrigger implements Trigger {
Map<String, String> taskProfile = new HashMap<>();
String md5 = AgentUtils.getFileMd5(path.toFile());
taskProfile.put(path.toFile().getAbsolutePath() +
".md5", md5);
- taskProfile.put(JobConstants.JOB_TRIGGER, null); //
del trigger id
+ taskProfile.put(JobConstants.JOB_FILE_JOB_TRIGGER,
null); // del trigger id
taskProfile.put(JobConstants.JOB_DIR_FILTER_PATTERNS,
path.toFile().getAbsolutePath());
LOGGER.info("trigger_{} generate job profile to read
file {}",
trigger.getTriggerProfile().getTriggerId(),
path);
diff --git
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/utils/PluginUtils.java
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/utils/PluginUtils.java
index 954bdc1a94..91412a90a1 100755
---
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/utils/PluginUtils.java
+++
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/utils/PluginUtils.java
@@ -129,7 +129,7 @@ public class PluginUtils {
JobProfile copiedProfile =
TriggerProfile.parseJsonStr(triggerProfile.toJsonStr());
String md5 = AgentUtils.getFileMd5(pendingFile);
copiedProfile.set(pendingFile.getAbsolutePath() + ".md5", md5);
- copiedProfile.set(JobConstants.JOB_TRIGGER, null); // del trigger id
+ copiedProfile.set(JobConstants.JOB_FILE_JOB_TRIGGER, null); // del
trigger id
copiedProfile.set(JobConstants.JOB_DIR_FILTER_PATTERNS,
pendingFile.getAbsolutePath());
return copiedProfile;
}