This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 11a5b40ddf [INLONG-8655][Agent] Fix bug: JobWrapper thread leaks when 
the job is stopped (#8656)
11a5b40ddf is described below

commit 11a5b40ddf52e1f8b0f4ab1229a9868302defa72
Author: justinwwhuang <[email protected]>
AuthorDate: Tue Aug 8 10:38:31 2023 +0800

    [INLONG-8655][Agent] Fix bug: JobWrapper thread leaks when the job is 
stopped (#8656)
---
 .../apache/inlong/agent/constant/JobConstants.java |  2 +-
 .../apache/inlong/agent/core/conf/ConfigJetty.java |  6 +-
 .../apache/inlong/agent/core/job/JobManager.java   | 87 ++++++++++++++--------
 .../apache/inlong/agent/core/job/JobWrapper.java   |  1 +
 .../inlong/agent/core/trigger/TriggerManager.java  | 13 ++--
 .../agent/plugin/fetcher/ManagerFetcher.java       | 21 +++---
 .../agent/plugin/sources/TextFileSource.java       |  4 +-
 .../sources/reader/file/TriggerFileReader.java     |  2 +-
 .../agent/plugin/trigger/DirectoryTrigger.java     |  2 +-
 .../inlong/agent/plugin/utils/PluginUtils.java     |  2 +-
 10 files changed, 85 insertions(+), 55 deletions(-)

diff --git 
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/JobConstants.java
 
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/JobConstants.java
index 92ac0529f4..a2fff7885c 100755
--- 
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/JobConstants.java
+++ 
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/JobConstants.java
@@ -51,7 +51,7 @@ public class JobConstants extends CommonConstants {
     public static final String JOB_MQ_TOPIC = "job.topicInfo";
 
     // File job
-    public static final String JOB_FILE_JOB_TRIGGER = "job.fileJob.trigger";
+    public static final String JOB_FILE_TRIGGER = "job.fileJob.trigger";
     public static final String JOB_DIR_FILTER_PATTERN = 
"job.fileJob.dir.pattern"; // deprecated
     public static final String JOB_DIR_FILTER_PATTERNS = 
"job.fileJob.dir.patterns";
     public static final String JOB_DIR_FILTER_BLACKLIST = 
"job.fileJob.dir.blackList";
diff --git 
a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/conf/ConfigJetty.java
 
b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/conf/ConfigJetty.java
index b62413b032..5b58315b43 100644
--- 
a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/conf/ConfigJetty.java
+++ 
b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/conf/ConfigJetty.java
@@ -35,7 +35,7 @@ import org.slf4j.LoggerFactory;
 
 import java.io.Closeable;
 
-import static 
org.apache.inlong.agent.constant.JobConstants.JOB_FILE_JOB_TRIGGER;
+import static org.apache.inlong.agent.constant.JobConstants.JOB_FILE_TRIGGER;
 import static org.apache.inlong.agent.constant.JobConstants.JOB_SOURCE_TYPE;
 
 /**
@@ -87,7 +87,7 @@ public class ConfigJetty implements Closeable {
         // store job conf to bdb
         if (jobProfile != null) {
             // trigger job is a special kind of job
-            if (jobProfile.hasKey(JOB_FILE_JOB_TRIGGER)) {
+            if (jobProfile.hasKey(JOB_FILE_TRIGGER)) {
                 triggerManager.submitTrigger(
                         TriggerProfile.parseJsonStr(jobProfile.toJsonStr()), 
true);
             } else {
@@ -123,7 +123,7 @@ public class ConfigJetty implements Closeable {
      */
     public void deleteJobConf(JobProfile jobProfile) {
         if (jobProfile != null) {
-            if (jobProfile.hasKey(JOB_FILE_JOB_TRIGGER)) {
+            if (jobProfile.hasKey(JOB_FILE_TRIGGER)) {
                 
triggerManager.deleteTrigger(TriggerProfile.parseJobProfile(jobProfile).getTriggerId(),
 false);
             } else {
                 jobManager.deleteJob(jobProfile.getInstanceId(), false);
diff --git 
a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/job/JobManager.java
 
b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/job/JobManager.java
index c1e933d8ae..9381a4e3c4 100644
--- 
a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/job/JobManager.java
+++ 
b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/job/JobManager.java
@@ -108,45 +108,44 @@ public class JobManager extends AbstractDaemon {
         this.dimensions = new HashMap<>();
         this.dimensions.put(KEY_COMPONENT_NAME, 
this.getClass().getSimpleName());
         this.jobMetrics = new 
AgentMetricItemSet(this.getClass().getSimpleName());
-        MetricRegister.unregister(jobMetrics);
         MetricRegister.register(jobMetrics);
     }
 
     /**
-     * submit job to work thread.
+     * add file job profile
      *
-     * @param job job
+     * @param profile job profile.
      */
-    private void addJob(Job job) {
-        if (pendingJobs.containsKey(job.getJobInstanceId())) {
-            return;
-        }
-        try {
-            JobWrapper jobWrapper = new JobWrapper(agentManager, job);
-            JobWrapper jobWrapperRet = 
jobs.putIfAbsent(jobWrapper.getJob().getJobInstanceId(), jobWrapper);
-            if (jobWrapperRet != null) {
-                LOGGER.warn("{} has been added to running pool, "
-                        + "cannot be added repeatedly", 
job.getJobInstanceId());
-                return;
-            } else {
-                getJobMetric().jobRunningCount.incrementAndGet();
-            }
-            this.runningPool.execute(jobWrapper);
-        } catch (Exception rje) {
-            LOGGER.debug("reject job {}", job.getJobInstanceId(), rje);
-            pendingJobs.putIfAbsent(job.getJobInstanceId(), job);
-        } catch (Throwable t) {
-            ThreadUtils.threadThrowableHandler(Thread.currentThread(), t);
-        }
+    public boolean submitFileJobProfile(JobProfile profile) {
+        return submitJobProfile(profile, false, true);
     }
 
     /**
-     * add file job profile
+     * make up file job
      *
      * @param profile job profile.
      */
-    public boolean submitFileJobProfile(JobProfile profile) {
-        return submitJobProfile(profile, false, true);
+    public void makeUpJob(JobProfile profile, boolean singleJob) {
+        LOGGER.error("need to make up job {}", profile);
+        if (!isJobValid(profile)) {
+            LOGGER.error("make up job failed, invalid profile {}", profile);
+            return;
+        }
+        String jobId = profile.get(JOB_ID);
+        if (singleJob) {
+            profile.set(JOB_INSTANCE_ID, 
AgentUtils.getSingleJobId(JOB_ID_PREFIX, jobId));
+        } else {
+            profile.set(JOB_INSTANCE_ID, AgentUtils.getUniqId(JOB_ID_PREFIX, 
jobId, index.incrementAndGet()));
+        }
+        JobProfile jobFromDb = 
jobProfileDb.getJobById(profile.getInstanceId());
+        if (jobFromDb == null) {
+            jobProfileDb.storeJobFirstTime(profile);
+        } else {
+            jobFromDb.set(JOB_VERSION, profile.get(JOB_VERSION));
+            profile = jobFromDb;
+        }
+        LOGGER.info("submit job final profile {}", profile.toJsonStr());
+        addJobToMemory(new Job(profile));
     }
 
     /**
@@ -177,10 +176,38 @@ public class JobManager extends AbstractDaemon {
             }
         }
         LOGGER.info("submit job final profile {}", profile.toJsonStr());
-        addJob(new Job(profile));
+        addJobToMemory(new Job(profile));
         return true;
     }
 
+    /**
+     * submit job to work thread.
+     *
+     * @param job job
+     */
+    private void addJobToMemory(Job job) {
+        if (pendingJobs.containsKey(job.getJobInstanceId())) {
+            return;
+        }
+        try {
+            JobWrapper jobWrapper = new JobWrapper(agentManager, job);
+            JobWrapper jobWrapperRet = 
jobs.putIfAbsent(jobWrapper.getJob().getJobInstanceId(), jobWrapper);
+            if (jobWrapperRet != null) {
+                LOGGER.warn("{} has been added to running pool, "
+                        + "cannot be added repeatedly", 
job.getJobInstanceId());
+                return;
+            } else {
+                getJobMetric().jobRunningCount.incrementAndGet();
+            }
+            this.runningPool.execute(jobWrapper);
+        } catch (Exception rje) {
+            LOGGER.debug("reject job {}", job.getJobInstanceId(), rje);
+            pendingJobs.putIfAbsent(job.getJobInstanceId(), job);
+        } catch (Throwable t) {
+            ThreadUtils.threadThrowableHandler(Thread.currentThread(), t);
+        }
+    }
+
     private boolean isJobValid(JobProfile profile) {
         if (profile == null || !profile.allRequiredKeyExist()) {
             LOGGER.error("profile is null or not all required key exists {}", 
profile == null ? null
@@ -227,7 +254,7 @@ public class JobManager extends AbstractDaemon {
         List<JobProfile> profileList = jobProfileDb.getRestartJobs();
         for (JobProfile profile : profileList) {
             LOGGER.info("init starting job from db {}", profile.toJsonStr());
-            addJob(new Job(profile));
+            addJobToMemory(new Job(profile));
         }
     }
 
@@ -242,7 +269,7 @@ public class JobManager extends AbstractDaemon {
                     for (String jobId : pendingJobs.keySet()) {
                         Job job = pendingJobs.remove(jobId);
                         if (job != null) {
-                            addJob(job);
+                            addJobToMemory(job);
                         }
                     }
                     TimeUnit.SECONDS.sleep(monitorInterval);
diff --git 
a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/job/JobWrapper.java
 
b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/job/JobWrapper.java
index 29a496a6f5..397812624b 100644
--- 
a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/job/JobWrapper.java
+++ 
b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/job/JobWrapper.java
@@ -204,6 +204,7 @@ public class JobWrapper extends AbstractStateWrapper {
     public void cleanup() {
         isEnd = true;
         allTasks.forEach(task -> taskManager.removeTask(task.getTaskId()));
+        allTasks.clear();
     }
 
     @Override
diff --git 
a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/trigger/TriggerManager.java
 
b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/trigger/TriggerManager.java
index 122415a683..786bcf62cd 100755
--- 
a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/trigger/TriggerManager.java
+++ 
b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/trigger/TriggerManager.java
@@ -67,13 +67,13 @@ public class TriggerManager extends AbstractDaemon {
     }
 
     /**
-     * Restore trigger task.
+     * add trigger task to memory.
      *
      * @param triggerProfile trigger profile
      */
-    public boolean restoreTrigger(TriggerProfile triggerProfile) {
+    public boolean addTriggerToMemory(TriggerProfile triggerProfile) {
         try {
-            Class<?> triggerClass = 
Class.forName(triggerProfile.get(JobConstants.JOB_FILE_JOB_TRIGGER));
+            Class<?> triggerClass = 
Class.forName(triggerProfile.get(JobConstants.JOB_FILE_TRIGGER));
             Trigger trigger = (Trigger) triggerClass.newInstance();
             String triggerId = triggerProfile.get(JOB_ID);
             if (triggerMap.containsKey(triggerId)) {
@@ -120,7 +120,7 @@ public class TriggerManager extends AbstractDaemon {
         // This action must be done before saving in db, because the 
job.instance.id is needed for the next recovery
         manager.getJobManager().submitJobProfile(triggerProfile, true, 
isNewJob);
         triggerProfileDB.storeTrigger(triggerProfile);
-        restoreTrigger(triggerProfile);
+        addTriggerToMemory(triggerProfile);
     }
 
     /**
@@ -160,6 +160,7 @@ public class TriggerManager extends AbstractDaemon {
                             JobWrapper job = 
jobWrapperMap.get(trigger.getTriggerProfile().getInstanceId());
                             if (job == null) {
                                 LOGGER.error("job {} should not be null", 
trigger.getTriggerProfile().getInstanceId());
+                                
manager.getJobManager().makeUpJob(trigger.getTriggerProfile(), true);
                                 return;
                             }
                             String subTaskFile = 
profile.getOrDefault(JobConstants.JOB_DIR_FILTER_PATTERNS, "");
@@ -171,7 +172,7 @@ public class TriggerManager extends AbstractDaemon {
                             // necessary to filter the stated monitored file 
task.
 
                             boolean alreadyExistTask = job.exist(tasks -> 
tasks.stream()
-                                    .filter(task -> 
!task.getJobConf().hasKey(JobConstants.JOB_FILE_JOB_TRIGGER))
+                                    .filter(task -> 
!task.getJobConf().hasKey(JobConstants.JOB_FILE_TRIGGER))
                                     .filter(task -> subTaskFile.equals(
                                             
task.getJobConf().get(JobConstants.JOB_DIR_FILTER_PATTERNS, "")))
                                     .findAny().isPresent());
@@ -202,7 +203,7 @@ public class TriggerManager extends AbstractDaemon {
         // fetch all triggers from db
         List<TriggerProfile> profileList = triggerProfileDB.getTriggers();
         for (TriggerProfile profile : profileList) {
-            restoreTrigger(profile);
+            addTriggerToMemory(profile);
         }
     }
 
diff --git 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/fetcher/ManagerFetcher.java
 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/fetcher/ManagerFetcher.java
index ac4939b147..1dd85bc15d 100755
--- 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/fetcher/ManagerFetcher.java
+++ 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/fetcher/ManagerFetcher.java
@@ -75,7 +75,7 @@ import static 
org.apache.inlong.agent.constant.FetcherConstants.DEFAULT_AGENT_MA
 import static 
org.apache.inlong.agent.constant.FetcherConstants.DEFAULT_AGENT_TDM_IP_CHECK_HTTP_PATH;
 import static 
org.apache.inlong.agent.constant.FetcherConstants.DEFAULT_AGENT_TDM_VIP_HTTP_PATH;
 import static org.apache.inlong.agent.constant.FetcherConstants.VERSION;
-import static 
org.apache.inlong.agent.constant.JobConstants.JOB_FILE_JOB_TRIGGER;
+import static org.apache.inlong.agent.constant.JobConstants.JOB_FILE_TRIGGER;
 import static org.apache.inlong.agent.constant.JobConstants.JOB_OP;
 import static org.apache.inlong.agent.constant.JobConstants.JOB_RETRY_TIME;
 import static 
org.apache.inlong.agent.plugin.fetcher.ManagerResultFormatter.getResultData;
@@ -95,7 +95,7 @@ public class ManagerFetcher extends AbstractDaemon implements 
ProfileFetcher {
     private static final int MAX_RETRY = 2;
     private final String managerVipUrl;
     private final String baseManagerUrl;
-    private final String managerTaskUrl;
+    private final String fetchAndReportFileTaskUrl;
     private final String managerIpsCheckUrl;
     private final String managerDbCollectorTaskUrl;
     private final AgentConfiguration conf;
@@ -115,7 +115,7 @@ public class ManagerFetcher extends AbstractDaemon 
implements ProfileFetcher {
             httpManager = new HttpManager(conf);
             baseManagerUrl = buildBaseUrl();
             managerVipUrl = buildVipUrl(baseManagerUrl);
-            managerTaskUrl = buildFileCollectTaskUrl(baseManagerUrl);
+            fetchAndReportFileTaskUrl = 
buildFileCollectTaskUrl(baseManagerUrl);
             managerIpsCheckUrl = buildIpCheckUrl(baseManagerUrl);
             managerDbCollectorTaskUrl = 
buildDbCollectorGetTaskUrl(baseManagerUrl);
             uniqId = conf.get(AGENT_UNIQ_ID, DEFAULT_AGENT_UNIQ_ID);
@@ -132,7 +132,7 @@ public class ManagerFetcher extends AbstractDaemon 
implements ProfileFetcher {
 
     /**
      * build base url for manager according to config
-     * <p>
+     *
      * example - http://127.0.0.1:8080/inlong/manager/openapi
      */
     private String buildBaseUrl() {
@@ -143,7 +143,7 @@ public class ManagerFetcher extends AbstractDaemon 
implements ProfileFetcher {
 
     /**
      * build vip url for manager according to config
-     * <p>
+     *
      * example - 
http://127.0.0.1:8080/inlong/manager/openapi/agent/getManagerIpList
      */
     private String buildVipUrl(String baseUrl) {
@@ -152,7 +152,7 @@ public class ManagerFetcher extends AbstractDaemon 
implements ProfileFetcher {
 
     /**
      * build file collect task url for manager according to config
-     * <p>
+     *
      * example - 
http://127.0.0.1:8080/inlong/manager/openapi/fileAgent/getTaskConf
      */
     private String buildFileCollectTaskUrl(String baseUrl) {
@@ -161,7 +161,7 @@ public class ManagerFetcher extends AbstractDaemon 
implements ProfileFetcher {
 
     /**
      * build ip check url for manager according to config
-     * <p>
+     *
      * example - 
http://127.0.0.1:8080/inlong/manager/openapi/fileAgent/confirmAgentIp
      */
     private String buildIpCheckUrl(String baseUrl) {
@@ -170,7 +170,7 @@ public class ManagerFetcher extends AbstractDaemon 
implements ProfileFetcher {
 
     /**
      * build db collector get task url for manager according to config
-     * <p>
+     *
      * example - 
http://127.0.0.1:8080/inlong/manager/openapi/dbcollector/getTask
      */
     private String buildDbCollectorGetTaskUrl(String baseUrl) {
@@ -209,7 +209,8 @@ public class ManagerFetcher extends AbstractDaemon 
implements ProfileFetcher {
     public void fetchCommand() {
         LOGGER.info("fetchCommand start");
         List<CommandEntity> unackedCommands = commandDb.getUnackedCommands();
-        String resultStr = httpManager.doSentPost(managerTaskUrl, 
getFetchRequest(unackedCommands));
+        String resultStr = httpManager.doSentPost(fetchAndReportFileTaskUrl,
+                getFetchRequest(unackedCommands));
         JsonObject resultData = getResultData(resultStr);
         JsonElement element = resultData.get(AGENT_MANAGER_RETURN_PARAM_DATA);
         if (element != null) {
@@ -267,7 +268,7 @@ public class ManagerFetcher extends AbstractDaemon 
implements ProfileFetcher {
                 .map(TriggerProfile::getTriggerProfiles)
                 .forEach(profile -> {
                     LOGGER.info("the triggerProfile: {}", profile.toJsonStr());
-                    if (profile.hasKey(JOB_FILE_JOB_TRIGGER)) {
+                    if (profile.hasKey(JOB_FILE_TRIGGER)) {
                         dealWithFileTriggerProfile(profile);
                     } else {
                         dealWithJobProfile(profile);
diff --git 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/TextFileSource.java
 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/TextFileSource.java
index 59fbc0b724..4dfae0c06b 100755
--- 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/TextFileSource.java
+++ 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/TextFileSource.java
@@ -36,7 +36,7 @@ import java.util.List;
 import static org.apache.inlong.agent.constant.CommonConstants.POSITION_SUFFIX;
 import static 
org.apache.inlong.agent.constant.JobConstants.DEFAULT_JOB_LINE_FILTER;
 import static 
org.apache.inlong.agent.constant.JobConstants.DEFAULT_JOB_READ_WAIT_TIMEOUT;
-import static 
org.apache.inlong.agent.constant.JobConstants.JOB_FILE_JOB_TRIGGER;
+import static org.apache.inlong.agent.constant.JobConstants.JOB_FILE_TRIGGER;
 import static 
org.apache.inlong.agent.constant.JobConstants.JOB_LINE_FILTER_PATTERN;
 import static 
org.apache.inlong.agent.constant.JobConstants.JOB_READ_WAIT_TIMEOUT;
 
@@ -53,7 +53,7 @@ public class TextFileSource extends AbstractSource {
     @Override
     public List<Reader> split(JobProfile jobConf) {
         super.init(jobConf);
-        if (jobConf.hasKey(JOB_FILE_JOB_TRIGGER)) {
+        if (jobConf.hasKey(JOB_FILE_TRIGGER)) {
             // trigger as a special reader.
             return Collections.singletonList(new TriggerFileReader());
         }
diff --git 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/file/TriggerFileReader.java
 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/file/TriggerFileReader.java
index 3639eaf95e..23d2311ae7 100644
--- 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/file/TriggerFileReader.java
+++ 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/file/TriggerFileReader.java
@@ -81,7 +81,7 @@ public class TriggerFileReader implements Reader {
 
     @Override
     public void init(JobProfile jobConf) {
-        this.triggerId = jobConf.get(JobConstants.JOB_FILE_JOB_TRIGGER);
+        this.triggerId = jobConf.get(JobConstants.JOB_FILE_TRIGGER);
     }
 
     @Override
diff --git 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/trigger/DirectoryTrigger.java
 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/trigger/DirectoryTrigger.java
index 7ce8171c7f..38e688713a 100644
--- 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/trigger/DirectoryTrigger.java
+++ 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/trigger/DirectoryTrigger.java
@@ -295,7 +295,7 @@ public class DirectoryTrigger implements Trigger {
                         Map<String, String> taskProfile = new HashMap<>();
                         String md5 = AgentUtils.getFileMd5(path.toFile());
                         taskProfile.put(path.toFile().getAbsolutePath() + 
".md5", md5);
-                        taskProfile.put(JobConstants.JOB_FILE_JOB_TRIGGER, 
null); // del trigger id
+                        taskProfile.put(JobConstants.JOB_FILE_TRIGGER, null); 
// del trigger id
                         taskProfile.put(JobConstants.JOB_DIR_FILTER_PATTERNS, 
path.toFile().getAbsolutePath());
                         LOGGER.info("trigger_{} generate job profile to read 
file {}",
                                 trigger.getTriggerProfile().getTriggerId(), 
path);
diff --git 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/utils/PluginUtils.java
 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/utils/PluginUtils.java
index 91412a90a1..6d84f6d520 100755
--- 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/utils/PluginUtils.java
+++ 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/utils/PluginUtils.java
@@ -129,7 +129,7 @@ public class PluginUtils {
         JobProfile copiedProfile = 
TriggerProfile.parseJsonStr(triggerProfile.toJsonStr());
         String md5 = AgentUtils.getFileMd5(pendingFile);
         copiedProfile.set(pendingFile.getAbsolutePath() + ".md5", md5);
-        copiedProfile.set(JobConstants.JOB_FILE_JOB_TRIGGER, null); // del 
trigger id
+        copiedProfile.set(JobConstants.JOB_FILE_TRIGGER, null); // del trigger 
id
         copiedProfile.set(JobConstants.JOB_DIR_FILTER_PATTERNS, 
pendingFile.getAbsolutePath());
         return copiedProfile;
     }

Reply via email to