This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 88770263f6 [INLONG-8645][Agent] Delete the capacity of loading trigger 
from local file (#8646)
88770263f6 is described below

commit 88770263f66ba7f2bd68dc6ad9044a6541db4ade
Author: justinwwhuang <[email protected]>
AuthorDate: Mon Aug 7 17:13:28 2023 +0800

    [INLONG-8645][Agent] Delete the capacity of loading trigger from local file 
(#8646)
---
 .../apache/inlong/agent/constant/JobConstants.java |  2 +-
 .../org/apache/inlong/agent/core/AgentManager.java | 25 ----------------------
 .../apache/inlong/agent/core/conf/ConfigJetty.java |  6 +++---
 .../inlong/agent/core/trigger/TriggerManager.java  |  4 ++--
 .../agent/plugin/fetcher/ManagerFetcher.java       |  8 +++----
 .../agent/plugin/sources/TextFileSource.java       |  4 ++--
 .../sources/reader/file/TriggerFileReader.java     |  2 +-
 .../agent/plugin/trigger/DirectoryTrigger.java     |  2 +-
 .../inlong/agent/plugin/utils/PluginUtils.java     |  2 +-
 9 files changed, 15 insertions(+), 40 deletions(-)

diff --git 
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/JobConstants.java
 
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/JobConstants.java
index 79c1fd4a30..92ac0529f4 100755
--- 
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/JobConstants.java
+++ 
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/JobConstants.java
@@ -51,7 +51,7 @@ public class JobConstants extends CommonConstants {
     public static final String JOB_MQ_TOPIC = "job.topicInfo";
 
     // File job
-    public static final String JOB_TRIGGER = "job.fileJob.trigger";
+    public static final String JOB_FILE_JOB_TRIGGER = "job.fileJob.trigger";
     public static final String JOB_DIR_FILTER_PATTERN = 
"job.fileJob.dir.pattern"; // deprecated
     public static final String JOB_DIR_FILTER_PATTERNS = 
"job.fileJob.dir.patterns";
     public static final String JOB_DIR_FILTER_BLACKLIST = 
"job.fileJob.dir.blackList";
diff --git 
a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/AgentManager.java
 
b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/AgentManager.java
index 11f9ce793e..c3e2344b27 100755
--- 
a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/AgentManager.java
+++ 
b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/AgentManager.java
@@ -19,9 +19,7 @@ package org.apache.inlong.agent.core;
 
 import org.apache.inlong.agent.common.AbstractDaemon;
 import org.apache.inlong.agent.conf.AgentConfiguration;
-import org.apache.inlong.agent.conf.JobProfile;
 import org.apache.inlong.agent.conf.ProfileFetcher;
-import org.apache.inlong.agent.conf.TriggerProfile;
 import org.apache.inlong.agent.constant.AgentConstants;
 import org.apache.inlong.agent.core.conf.ConfigJetty;
 import org.apache.inlong.agent.core.job.JobManager;
@@ -31,7 +29,6 @@ import org.apache.inlong.agent.core.trigger.TriggerManager;
 import org.apache.inlong.agent.db.CommandDb;
 import org.apache.inlong.agent.db.Db;
 import org.apache.inlong.agent.db.JobProfileDb;
-import org.apache.inlong.agent.db.LocalProfile;
 import org.apache.inlong.agent.db.TriggerProfileDb;
 
 import org.slf4j.Logger;
@@ -39,14 +36,9 @@ import org.slf4j.LoggerFactory;
 
 import java.io.File;
 import java.lang.reflect.Constructor;
-import java.util.List;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 
-import static 
org.apache.inlong.agent.constant.AgentConstants.AGENT_CONF_PARENT;
-import static 
org.apache.inlong.agent.constant.AgentConstants.DEFAULT_AGENT_CONF_PARENT;
-import static org.apache.inlong.agent.constant.JobConstants.JOB_TRIGGER;
-
 /**
  * Agent Manager, the bridge for job manager, task manager, db e.t.c it 
manages agent level operations and communicates
  * with outside system.
@@ -63,7 +55,6 @@ public class AgentManager extends AbstractDaemon {
     private final AgentConfiguration conf;
     private final ExecutorService agentConfMonitor;
     private final Db db;
-    private final LocalProfile localProfile;
     private final CommandDb commandDb;
     private final JobProfileDb jobProfileDb;
     // jetty for config operations via http.
@@ -75,8 +66,6 @@ public class AgentManager extends AbstractDaemon {
         this.db = initDb();
         commandDb = new CommandDb(db);
         jobProfileDb = new JobProfileDb(db);
-        String parentConfPath = conf.get(AGENT_CONF_PARENT, 
DEFAULT_AGENT_CONF_PARENT);
-        localProfile = new LocalProfile(parentConfPath);
         triggerManager = new TriggerManager(this, new TriggerProfileDb(db));
         jobManager = new JobManager(this, jobProfileDb);
         taskManager = new TaskManager(this);
@@ -208,20 +197,6 @@ public class AgentManager extends AbstractDaemon {
         LOGGER.info("starting task position manager");
         positionManager.start();
         LOGGER.info("starting read job from local");
-        // read job profiles from local
-        List<JobProfile> profileList = localProfile.readFromLocal();
-        for (JobProfile profile : profileList) {
-            if (profile.hasKey(JOB_TRIGGER)) {
-                TriggerProfile triggerProfile = 
TriggerProfile.parseJobProfile(profile);
-                // there is no need to store this profile in triggerDB, because
-                // this profile comes from local file.
-                triggerManager.restoreTrigger(triggerProfile);
-            } else {
-                // job db store instance info, so it's suitable to use 
submitJobProfile
-                // to store instance into job db.
-                jobManager.submitFileJobProfile(profile);
-            }
-        }
         LOGGER.info("starting fetcher");
         if (fetcher != null) {
             fetcher.start();
diff --git 
a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/conf/ConfigJetty.java
 
b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/conf/ConfigJetty.java
index 34e40f8fff..b62413b032 100644
--- 
a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/conf/ConfigJetty.java
+++ 
b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/conf/ConfigJetty.java
@@ -35,8 +35,8 @@ import org.slf4j.LoggerFactory;
 
 import java.io.Closeable;
 
+import static 
org.apache.inlong.agent.constant.JobConstants.JOB_FILE_JOB_TRIGGER;
 import static org.apache.inlong.agent.constant.JobConstants.JOB_SOURCE_TYPE;
-import static org.apache.inlong.agent.constant.JobConstants.JOB_TRIGGER;
 
 /**
  * start http server and get job/agent config via http
@@ -87,7 +87,7 @@ public class ConfigJetty implements Closeable {
         // store job conf to bdb
         if (jobProfile != null) {
             // trigger job is a special kind of job
-            if (jobProfile.hasKey(JOB_TRIGGER)) {
+            if (jobProfile.hasKey(JOB_FILE_JOB_TRIGGER)) {
                 triggerManager.submitTrigger(
                         TriggerProfile.parseJsonStr(jobProfile.toJsonStr()), 
true);
             } else {
@@ -123,7 +123,7 @@ public class ConfigJetty implements Closeable {
      */
     public void deleteJobConf(JobProfile jobProfile) {
         if (jobProfile != null) {
-            if (jobProfile.hasKey(JOB_TRIGGER)) {
+            if (jobProfile.hasKey(JOB_FILE_JOB_TRIGGER)) {
                 
triggerManager.deleteTrigger(TriggerProfile.parseJobProfile(jobProfile).getTriggerId(),
 false);
             } else {
                 jobManager.deleteJob(jobProfile.getInstanceId(), false);
diff --git 
a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/trigger/TriggerManager.java
 
b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/trigger/TriggerManager.java
index c5c005b83f..122415a683 100755
--- 
a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/trigger/TriggerManager.java
+++ 
b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/trigger/TriggerManager.java
@@ -73,7 +73,7 @@ public class TriggerManager extends AbstractDaemon {
      */
     public boolean restoreTrigger(TriggerProfile triggerProfile) {
         try {
-            Class<?> triggerClass = 
Class.forName(triggerProfile.get(JobConstants.JOB_TRIGGER));
+            Class<?> triggerClass = 
Class.forName(triggerProfile.get(JobConstants.JOB_FILE_JOB_TRIGGER));
             Trigger trigger = (Trigger) triggerClass.newInstance();
             String triggerId = triggerProfile.get(JOB_ID);
             if (triggerMap.containsKey(triggerId)) {
@@ -171,7 +171,7 @@ public class TriggerManager extends AbstractDaemon {
                             // necessary to filter the stated monitored file 
task.
 
                             boolean alreadyExistTask = job.exist(tasks -> 
tasks.stream()
-                                    .filter(task -> 
!task.getJobConf().hasKey(JobConstants.JOB_TRIGGER))
+                                    .filter(task -> 
!task.getJobConf().hasKey(JobConstants.JOB_FILE_JOB_TRIGGER))
                                     .filter(task -> subTaskFile.equals(
                                             
task.getJobConf().get(JobConstants.JOB_DIR_FILTER_PATTERNS, "")))
                                     .findAny().isPresent());
diff --git 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/fetcher/ManagerFetcher.java
 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/fetcher/ManagerFetcher.java
index 350104e698..ac4939b147 100755
--- 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/fetcher/ManagerFetcher.java
+++ 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/fetcher/ManagerFetcher.java
@@ -75,9 +75,9 @@ import static 
org.apache.inlong.agent.constant.FetcherConstants.DEFAULT_AGENT_MA
 import static 
org.apache.inlong.agent.constant.FetcherConstants.DEFAULT_AGENT_TDM_IP_CHECK_HTTP_PATH;
 import static 
org.apache.inlong.agent.constant.FetcherConstants.DEFAULT_AGENT_TDM_VIP_HTTP_PATH;
 import static org.apache.inlong.agent.constant.FetcherConstants.VERSION;
+import static 
org.apache.inlong.agent.constant.JobConstants.JOB_FILE_JOB_TRIGGER;
 import static org.apache.inlong.agent.constant.JobConstants.JOB_OP;
 import static org.apache.inlong.agent.constant.JobConstants.JOB_RETRY_TIME;
-import static org.apache.inlong.agent.constant.JobConstants.JOB_TRIGGER;
 import static 
org.apache.inlong.agent.plugin.fetcher.ManagerResultFormatter.getResultData;
 import static org.apache.inlong.agent.plugin.utils.PluginUtils.copyJobProfile;
 import static org.apache.inlong.agent.utils.AgentUtils.fetchLocalIp;
@@ -267,8 +267,8 @@ public class ManagerFetcher extends AbstractDaemon 
implements ProfileFetcher {
                 .map(TriggerProfile::getTriggerProfiles)
                 .forEach(profile -> {
                     LOGGER.info("the triggerProfile: {}", profile.toJsonStr());
-                    if (profile.hasKey(JOB_TRIGGER)) {
-                        dealWithTdmTriggerProfile(profile);
+                    if (profile.hasKey(JOB_FILE_JOB_TRIGGER)) {
+                        dealWithFileTriggerProfile(profile);
                     } else {
                         dealWithJobProfile(profile);
                     }
@@ -377,7 +377,7 @@ public class ManagerFetcher extends AbstractDaemon 
implements ProfileFetcher {
     /**
      * the trigger profile returned from manager should be parsed
      */
-    public void dealWithTdmTriggerProfile(TriggerProfile triggerProfile) {
+    public void dealWithFileTriggerProfile(TriggerProfile triggerProfile) {
         ManagerOpEnum opType = 
ManagerOpEnum.getOpType(triggerProfile.getInt(JOB_OP));
         boolean success = true;
         try {
diff --git 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/TextFileSource.java
 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/TextFileSource.java
index d680d49f0b..59fbc0b724 100755
--- 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/TextFileSource.java
+++ 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/TextFileSource.java
@@ -36,9 +36,9 @@ import java.util.List;
 import static org.apache.inlong.agent.constant.CommonConstants.POSITION_SUFFIX;
 import static 
org.apache.inlong.agent.constant.JobConstants.DEFAULT_JOB_LINE_FILTER;
 import static 
org.apache.inlong.agent.constant.JobConstants.DEFAULT_JOB_READ_WAIT_TIMEOUT;
+import static 
org.apache.inlong.agent.constant.JobConstants.JOB_FILE_JOB_TRIGGER;
 import static 
org.apache.inlong.agent.constant.JobConstants.JOB_LINE_FILTER_PATTERN;
 import static 
org.apache.inlong.agent.constant.JobConstants.JOB_READ_WAIT_TIMEOUT;
-import static org.apache.inlong.agent.constant.JobConstants.JOB_TRIGGER;
 
 /**
  * Read text files
@@ -53,7 +53,7 @@ public class TextFileSource extends AbstractSource {
     @Override
     public List<Reader> split(JobProfile jobConf) {
         super.init(jobConf);
-        if (jobConf.hasKey(JOB_TRIGGER)) {
+        if (jobConf.hasKey(JOB_FILE_JOB_TRIGGER)) {
             // trigger as a special reader.
             return Collections.singletonList(new TriggerFileReader());
         }
diff --git 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/file/TriggerFileReader.java
 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/file/TriggerFileReader.java
index f87de408cc..3639eaf95e 100644
--- 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/file/TriggerFileReader.java
+++ 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/file/TriggerFileReader.java
@@ -81,7 +81,7 @@ public class TriggerFileReader implements Reader {
 
     @Override
     public void init(JobProfile jobConf) {
-        this.triggerId = jobConf.get(JobConstants.JOB_TRIGGER);
+        this.triggerId = jobConf.get(JobConstants.JOB_FILE_JOB_TRIGGER);
     }
 
     @Override
diff --git 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/trigger/DirectoryTrigger.java
 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/trigger/DirectoryTrigger.java
index d78f8150e3..7ce8171c7f 100644
--- 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/trigger/DirectoryTrigger.java
+++ 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/trigger/DirectoryTrigger.java
@@ -295,7 +295,7 @@ public class DirectoryTrigger implements Trigger {
                         Map<String, String> taskProfile = new HashMap<>();
                         String md5 = AgentUtils.getFileMd5(path.toFile());
                         taskProfile.put(path.toFile().getAbsolutePath() + 
".md5", md5);
-                        taskProfile.put(JobConstants.JOB_TRIGGER, null); // 
del trigger id
+                        taskProfile.put(JobConstants.JOB_FILE_JOB_TRIGGER, 
null); // del trigger id
                         taskProfile.put(JobConstants.JOB_DIR_FILTER_PATTERNS, 
path.toFile().getAbsolutePath());
                         LOGGER.info("trigger_{} generate job profile to read 
file {}",
                                 trigger.getTriggerProfile().getTriggerId(), 
path);
diff --git 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/utils/PluginUtils.java
 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/utils/PluginUtils.java
index 954bdc1a94..91412a90a1 100755
--- 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/utils/PluginUtils.java
+++ 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/utils/PluginUtils.java
@@ -129,7 +129,7 @@ public class PluginUtils {
         JobProfile copiedProfile = 
TriggerProfile.parseJsonStr(triggerProfile.toJsonStr());
         String md5 = AgentUtils.getFileMd5(pendingFile);
         copiedProfile.set(pendingFile.getAbsolutePath() + ".md5", md5);
-        copiedProfile.set(JobConstants.JOB_TRIGGER, null); // del trigger id
+        copiedProfile.set(JobConstants.JOB_FILE_JOB_TRIGGER, null); // del 
trigger id
         copiedProfile.set(JobConstants.JOB_DIR_FILTER_PATTERNS, 
pendingFile.getAbsolutePath());
         return copiedProfile;
     }

Reply via email to