This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new f8faf2b37 [INLONG-6759][Agent] Improve code style (#6760)
f8faf2b37 is described below

commit f8faf2b379b37e5a3f37e0ffb7fd52e8fa2b6ce7
Author: xueyingzhang <[email protected]>
AuthorDate: Tue Dec 6 14:59:36 2022 +0800

    [INLONG-6759][Agent] Improve code style (#6760)
---
 .../apache/inlong/agent/common/AbstractDaemon.java |  2 +-
 .../inlong/agent/conf/AbstractConfiguration.java   |  5 ++
 .../inlong/agent/constant/CommandConstants.java    | 57 ----------------------
 .../apache/inlong/agent}/except/FileException.java |  2 +-
 .../inlong/agent/message/DefaultMessage.java       |  2 +-
 .../inlong/agent}/message/PackProxyMessage.java    |  4 +-
 .../apache/inlong/agent/message/ProxyMessage.java  | 12 ++---
 .../org/apache/inlong/agent/utils/HttpManager.java | 16 ++++++
 .../org/apache/inlong/agent/core/AgentManager.java |  6 +--
 .../apache/inlong/agent/core/HeartbeatManager.java | 41 ++++++++--------
 .../java/org/apache/inlong/agent/core/job/Job.java |  1 -
 .../apache/inlong/agent/core/job/JobManager.java   | 20 ++++----
 .../org/apache/inlong/agent/core/task/Task.java    |  6 +++
 .../apache/inlong/agent/core/task/TaskManager.java |  3 ++
 .../agent/core/task/TaskPositionManager.java       | 11 +++--
 .../agent/plugin/fetcher/ManagerFetcher.java       |  4 +-
 .../inlong/agent/plugin/sinks/AbstractSink.java    |  2 +-
 .../inlong/agent/plugin/sinks/ProxySink.java       |  4 +-
 .../inlong/agent/plugin/sinks/PulsarSink.java      |  8 +--
 .../inlong/agent/plugin/sinks/SenderManager.java   |  7 ++-
 .../plugin/sources/reader/AbstractReader.java      |  3 +-
 .../sources/reader/file/FileReaderOperator.java    |  2 +-
 .../apache/inlong/agent/plugin/sinks/MockSink.java |  7 ++-
 23 files changed, 96 insertions(+), 129 deletions(-)

diff --git 
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/common/AbstractDaemon.java
 
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/common/AbstractDaemon.java
index 653720d14..450606154 100644
--- 
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/common/AbstractDaemon.java
+++ 
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/common/AbstractDaemon.java
@@ -44,7 +44,7 @@ public abstract class AbstractDaemon implements Service {
                     60L, TimeUnit.SECONDS,
                     new SynchronousQueue<Runnable>(), new 
AgentThreadFactory("AbstractDaemon"));
     private final List<CompletableFuture<?>> workerFutures;
-    private boolean runnable = true;
+    private volatile boolean runnable = true;
 
     public AbstractDaemon() {
         this.workerFutures = new ArrayList<>();
diff --git 
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/conf/AbstractConfiguration.java
 
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/conf/AbstractConfiguration.java
index 46b67dd7d..62403b982 100644
--- 
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/conf/AbstractConfiguration.java
+++ 
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/conf/AbstractConfiguration.java
@@ -202,6 +202,11 @@ public abstract class AbstractConfiguration {
         return value == null ? defaultValue : value.getAsBoolean();
     }
 
+    public float getFloat(String key, float defaultValue) {
+        JsonElement value = configStorage.get(key);
+        return value == null ? defaultValue : value.getAsFloat();
+    }
+
     /**
      * get string
      *
diff --git 
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/CommandConstants.java
 
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/CommandConstants.java
deleted file mode 100644
index 31a9c3ec7..000000000
--- 
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/CommandConstants.java
+++ /dev/null
@@ -1,57 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.agent.constant;
-
-/**
- * old version of command result
- */
-public class CommandConstants {
-
-    public static final String ID = "id";
-
-    public static final String IDC = "idc";
-    public static final String IP = "ip";
-    public static final String OP = "op";
-    public static final String OUTPUT_TYPE = "output_type";
-    public static final String DATA_NAME = "data_name";
-    public static final String CHECK_NAME = "check_name";
-    public static final String HAVING_CHECK = "havingcheck";
-    public static final String USING_VERIFY = "usingverify";
-    public static final String ADDICTIVE_ATTR = "addictive_attr";
-    public static final String TIME_OFFSET = "time_offset";
-    public static final String SCHEDULE_TIME = "schedule_time";
-    public static final String SPECIFIED_DATA_TIME = "specified_data_time";
-    public static final String TASK_TYPE = "task_type";
-    public static final String MAX_FILE_NUM = "max_file_num";
-    public static final String TOPIC = "topic";
-    public static final String SEQUENCIAL = "sequencial";
-    public static final String DELAY_MILL_SEC = "delay_mill_sec";
-    public static final String STOP_TIME_MILL_SEC = "stop_time_mill_sec";
-    public static final String SCAN_INTERVAL_SEC = "scan_interval_sec";
-    public static final String SCAN_DURA_SEC = "scan_dura_sec";
-    public static final String CYCLE_UNIT = "cycle_unit";
-    public static final String DRIVER_CLASS_NAME = "driver_class_name";
-    public static final String THREAD_CLASS_NAME = "thread_class_name";
-    public static final String FILTER_CLASS_NAME = "filter_class_name";
-    public static final String CLUSTER_ID = "clusterId";
-    public static final String BUSINESS_ID = "business_id";
-    public static final String INTERFACE_ID = "interface_id";
-    public static final String FIELD_SPLITTER = "field_splitter";
-    public static final String MD5 = "md5";
-
-}
diff --git 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/except/FileException.java
 
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/except/FileException.java
similarity index 95%
rename from 
inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/except/FileException.java
rename to 
inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/except/FileException.java
index e86c22308..0325820ab 100644
--- 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/except/FileException.java
+++ 
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/except/FileException.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.inlong.agent.plugin.except;
+package org.apache.inlong.agent.except;
 
 public class FileException extends RuntimeException {
 
diff --git 
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/message/DefaultMessage.java
 
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/message/DefaultMessage.java
index fb34a5dc0..cbf17247c 100644
--- 
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/message/DefaultMessage.java
+++ 
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/message/DefaultMessage.java
@@ -30,7 +30,7 @@ import java.util.Map;
 public class DefaultMessage implements Message {
 
     private final byte[] body;
-    private final Map<String, String> header;
+    protected final Map<String, String> header;
 
     public DefaultMessage(byte[] body, Map<String, String> header) {
         this.body = body;
diff --git 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/message/PackProxyMessage.java
 
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/message/PackProxyMessage.java
similarity index 97%
rename from 
inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/message/PackProxyMessage.java
rename to 
inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/message/PackProxyMessage.java
index d51ba2ae0..6eb54cd1c 100644
--- 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/message/PackProxyMessage.java
+++ 
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/message/PackProxyMessage.java
@@ -15,11 +15,9 @@
  * limitations under the License.
  */
 
-package org.apache.inlong.agent.plugin.message;
+package org.apache.inlong.agent.message;
 
 import org.apache.inlong.agent.conf.JobProfile;
-import org.apache.inlong.agent.message.BatchProxyMessage;
-import org.apache.inlong.agent.message.ProxyMessage;
 import org.apache.inlong.agent.utils.AgentUtils;
 import org.apache.inlong.common.msg.AttributeConstants;
 import org.slf4j.Logger;
diff --git 
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/message/ProxyMessage.java
 
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/message/ProxyMessage.java
index f07d8ceba..d8c0e1d42 100644
--- 
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/message/ProxyMessage.java
+++ 
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/message/ProxyMessage.java
@@ -38,7 +38,7 @@ public class ProxyMessage implements Message {
     private final String inlongStreamId;
     // determine the group key when making batch
     private final String batchKey;
-    private String dataKey;
+    private final String dataKey;
 
     public ProxyMessage(byte[] body, Map<String, String> header) {
         this.body = body;
@@ -50,14 +50,8 @@ public class ProxyMessage implements Message {
         this.batchKey = dataKey + inlongStreamId;
     }
 
-    /**
-     * Transform Message to ProxyMessage
-     *
-     * @param message Message
-     * @return ProxyMessage
-     */
-    public static ProxyMessage parse(Message message) {
-        return new ProxyMessage(message.getBody(), message.getHeader());
+    public ProxyMessage(Message message) {
+        this(message.getBody(), message.getHeader());
     }
 
     public String getDataKey() {
diff --git 
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/utils/HttpManager.java
 
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/utils/HttpManager.java
index 4bc1ccaa5..98f6e1b8b 100644
--- 
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/utils/HttpManager.java
+++ 
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/utils/HttpManager.java
@@ -39,7 +39,11 @@ import static 
org.apache.inlong.agent.constant.FetcherConstants.AGENT_HTTP_SUCCE
 import static 
org.apache.inlong.agent.constant.FetcherConstants.AGENT_MANAGER_AUTH_SECRET_ID;
 import static 
org.apache.inlong.agent.constant.FetcherConstants.AGENT_MANAGER_AUTH_SECRET_KEY;
 import static 
org.apache.inlong.agent.constant.FetcherConstants.AGENT_MANAGER_REQUEST_TIMEOUT;
+import static 
org.apache.inlong.agent.constant.FetcherConstants.AGENT_MANAGER_VIP_HTTP_HOST;
+import static 
org.apache.inlong.agent.constant.FetcherConstants.AGENT_MANAGER_VIP_HTTP_PORT;
+import static 
org.apache.inlong.agent.constant.FetcherConstants.AGENT_MANAGER_VIP_HTTP_PREFIX_PATH;
 import static 
org.apache.inlong.agent.constant.FetcherConstants.DEFAULT_AGENT_MANAGER_REQUEST_TIMEOUT;
+import static 
org.apache.inlong.agent.constant.FetcherConstants.DEFAULT_AGENT_MANAGER_VIP_HTTP_PREFIX_PATH;
 
 /**
  * Perform http operation
@@ -48,6 +52,7 @@ public class HttpManager {
 
     private static final Logger LOGGER = 
LoggerFactory.getLogger(HttpManager.class);
     private static final Gson gson;
+    private static final AgentConfiguration agentConf = 
AgentConfiguration.getAgentConf();
 
     static {
         final GsonBuilder gsonBuilder = new 
GsonBuilder().setDateFormat("yyyy-MM-dd HH:mm:ss");
@@ -65,6 +70,17 @@ public class HttpManager {
         secretKey = conf.get(AGENT_MANAGER_AUTH_SECRET_KEY);
     }
 
+    /**
+     * build base url for manager according to config
+     *
+     * example - http://127.0.0.1:8080/inlong/manager/openapi
+     */
+    public static String buildBaseUrl() {
+        return "http://"; + agentConf.get(AGENT_MANAGER_VIP_HTTP_HOST)
+                + ":" + agentConf.get(AGENT_MANAGER_VIP_HTTP_PORT)
+                + agentConf.get(AGENT_MANAGER_VIP_HTTP_PREFIX_PATH, 
DEFAULT_AGENT_MANAGER_VIP_HTTP_PREFIX_PATH);
+    }
+
     /**
      * construct http client
      *
diff --git 
a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/AgentManager.java
 
b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/AgentManager.java
index 1716cf050..2a1231e34 100755
--- 
a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/AgentManager.java
+++ 
b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/AgentManager.java
@@ -71,12 +71,12 @@ public class AgentManager extends AbstractDaemon {
         jobProfileDb = new JobProfileDb(db);
         String parentConfPath = conf.get(AGENT_CONF_PARENT, 
DEFAULT_AGENT_CONF_PARENT);
         localProfile = new LocalProfile(parentConfPath);
-        fetcher = initFetcher(this);
         triggerManager = new TriggerManager(this, new TriggerProfileDb(db));
         jobManager = new JobManager(this, jobProfileDb);
         taskManager = new TaskManager(this);
-        heartbeatManager = new HeartbeatManager(this);
-        taskPositionManager = TaskPositionManager.getTaskPositionManager(this);
+        fetcher = initFetcher(this);
+        heartbeatManager = HeartbeatManager.getInstance(this);
+        taskPositionManager = TaskPositionManager.getInstance(this);
         // need to be an option.
         if (conf.getBoolean(
                 AgentConstants.AGENT_ENABLE_HTTP, 
AgentConstants.DEFAULT_AGENT_ENABLE_HTTP)) {
diff --git 
a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/HeartbeatManager.java
 
b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/HeartbeatManager.java
index ffae9eb4d..5eadee6e6 100644
--- 
a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/HeartbeatManager.java
+++ 
b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/HeartbeatManager.java
@@ -54,13 +54,9 @@ import static 
org.apache.inlong.agent.constant.AgentConstants.DEFAULT_AGENT_HTTP
 import static 
org.apache.inlong.agent.constant.FetcherConstants.AGENT_HEARTBEAT_INTERVAL;
 import static 
org.apache.inlong.agent.constant.FetcherConstants.AGENT_MANAGER_HEARTBEAT_HTTP_PATH;
 import static 
org.apache.inlong.agent.constant.FetcherConstants.AGENT_MANAGER_REPORTSNAPSHOT_HTTP_PATH;
-import static 
org.apache.inlong.agent.constant.FetcherConstants.AGENT_MANAGER_VIP_HTTP_HOST;
-import static 
org.apache.inlong.agent.constant.FetcherConstants.AGENT_MANAGER_VIP_HTTP_PORT;
-import static 
org.apache.inlong.agent.constant.FetcherConstants.AGENT_MANAGER_VIP_HTTP_PREFIX_PATH;
 import static 
org.apache.inlong.agent.constant.FetcherConstants.DEFAULT_AGENT_HEARTBEAT_INTERVAL;
 import static 
org.apache.inlong.agent.constant.FetcherConstants.DEFAULT_AGENT_MANAGER_HEARTBEAT_HTTP_PATH;
 import static 
org.apache.inlong.agent.constant.FetcherConstants.DEFAULT_AGENT_MANAGER_REPORTSNAPSHOT_HTTP_PATH;
-import static 
org.apache.inlong.agent.constant.FetcherConstants.DEFAULT_AGENT_MANAGER_VIP_HTTP_PREFIX_PATH;
 import static org.apache.inlong.agent.constant.JobConstants.JOB_GROUP_ID;
 import static org.apache.inlong.agent.constant.JobConstants.JOB_STREAM_ID;
 
@@ -70,8 +66,7 @@ import static 
org.apache.inlong.agent.constant.JobConstants.JOB_STREAM_ID;
 public class HeartbeatManager extends AbstractDaemon implements 
AbstractHeartbeatManager {
 
     private static final Logger LOGGER = 
LoggerFactory.getLogger(HeartbeatManager.class);
-
-    private final AgentManager agentManager;
+    private static HeartbeatManager heartbeatManager = null;
     private final JobManager jobmanager;
     private final AgentConfiguration conf;
     private final HttpManager httpManager;
@@ -83,16 +78,33 @@ public class HeartbeatManager extends AbstractDaemon 
implements AbstractHeartbea
     /**
      * Init heartbeat manager.
      */
-    public HeartbeatManager(AgentManager agentManager) {
+    private HeartbeatManager(AgentManager agentManager) {
         this.conf = AgentConfiguration.getAgentConf();
-        this.agentManager = agentManager;
         jobmanager = agentManager.getJobManager();
         httpManager = new HttpManager(conf);
-        baseManagerUrl = buildBaseUrl();
+        baseManagerUrl = HttpManager.buildBaseUrl();
         reportSnapshotUrl = buildReportSnapShotUrl(baseManagerUrl);
         reportHeartbeatUrl = buildReportHeartbeatUrl(baseManagerUrl);
     }
 
+    public static HeartbeatManager getInstance(AgentManager agentManager) {
+        if (heartbeatManager == null) {
+            synchronized (HeartbeatManager.class) {
+                if (heartbeatManager == null) {
+                    heartbeatManager = new HeartbeatManager(agentManager);
+                }
+            }
+        }
+        return heartbeatManager;
+    }
+
+    public static HeartbeatManager getInstance() {
+        if (heartbeatManager == null) {
+            throw new RuntimeException("HeartbeatManager has not been 
initialized by agentManager");
+        }
+        return heartbeatManager;
+    }
+
     @Override
     public void start() throws Exception {
         submitWorker(snapshotReportThread());
@@ -226,17 +238,6 @@ public class HeartbeatManager extends AbstractDaemon 
implements AbstractHeartbea
         return heartbeatMsg;
     }
 
-    /**
-     * build base url for manager according to config
-     *
-     * example - http://127.0.0.1:8080/inlong/manager/openapi
-     */
-    private String buildBaseUrl() {
-        return "http://"; + conf.get(AGENT_MANAGER_VIP_HTTP_HOST)
-                + ":" + conf.get(AGENT_MANAGER_VIP_HTTP_PORT)
-                + conf.get(AGENT_MANAGER_VIP_HTTP_PREFIX_PATH, 
DEFAULT_AGENT_MANAGER_VIP_HTTP_PREFIX_PATH);
-    }
-
     private String buildReportSnapShotUrl(String baseUrl) {
         return baseUrl
                 + conf.get(AGENT_MANAGER_REPORTSNAPSHOT_HTTP_PATH, 
DEFAULT_AGENT_MANAGER_REPORTSNAPSHOT_HTTP_PATH);
diff --git 
a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/job/Job.java
 
b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/job/Job.java
index 125fd7897..35c3f6695 100644
--- 
a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/job/Job.java
+++ 
b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/job/Job.java
@@ -45,7 +45,6 @@ public class Job {
     // job description
     private String description;
     protected String jobInstanceId;
-    protected List<Task> taskList = new ArrayList<>();
     protected ThreadLocal<Integer> threadNum = new ThreadLocal<Integer>() {
 
         protected Integer initialValue() {
diff --git 
a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/job/JobManager.java
 
b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/job/JobManager.java
index 42dce296e..31792616c 100644
--- 
a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/job/JobManager.java
+++ 
b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/job/JobManager.java
@@ -79,6 +79,8 @@ public class JobManager extends AbstractDaemon {
     private final AgentMetricItemSet jobMetrics;
     private final Map<String, String> dimensions;
 
+    private final AgentConfiguration agentConf;
+
     /**
      * init job manager
      *
@@ -88,20 +90,18 @@ public class JobManager extends AbstractDaemon {
         this.jobProfileDb = jobProfileDb;
         this.agentManager = agentManager;
         // job thread pool for running
-        this.runningPool = new ThreadPoolExecutor(
-                0, Integer.MAX_VALUE,
-                60L, TimeUnit.SECONDS,
-                new SynchronousQueue<>(),
-                new AgentThreadFactory("job"));
+        this.runningPool = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, 
TimeUnit.SECONDS,
+                new SynchronousQueue<>(), new AgentThreadFactory("job"));
         this.jobs = new ConcurrentHashMap<>();
+        this.agentConf = AgentConfiguration.getAgentConf();
         this.pendingJobs = new ConcurrentHashMap<>();
-        AgentConfiguration conf = AgentConfiguration.getAgentConf();
-        this.monitorInterval = conf
+        this.monitorInterval = agentConf
                 .getInt(
                         AgentConstants.JOB_MONITOR_INTERVAL, 
AgentConstants.DEFAULT_JOB_MONITOR_INTERVAL);
-        this.jobDbCacheTime = conf.getLong(JOB_DB_CACHE_TIME, 
DEFAULT_JOB_DB_CACHE_TIME);
-        this.jobDbCacheCheckInterval = 
conf.getLong(JOB_DB_CACHE_CHECK_INTERVAL, DEFAULT_JOB_DB_CACHE_CHECK_INTERVAL);
-        this.jobMaxSize = conf.getLong(JOB_NUMBER_LIMIT, 
DEFAULT_JOB_NUMBER_LIMIT);
+        this.jobDbCacheTime = agentConf.getLong(JOB_DB_CACHE_TIME, 
DEFAULT_JOB_DB_CACHE_TIME);
+        this.jobDbCacheCheckInterval = 
agentConf.getLong(JOB_DB_CACHE_CHECK_INTERVAL,
+                DEFAULT_JOB_DB_CACHE_CHECK_INTERVAL);
+        this.jobMaxSize = agentConf.getLong(JOB_NUMBER_LIMIT, 
DEFAULT_JOB_NUMBER_LIMIT);
 
         this.dimensions = new HashMap<>();
         this.dimensions.put(KEY_COMPONENT_NAME, 
this.getClass().getSimpleName());
diff --git 
a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/task/Task.java
 
b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/task/Task.java
index a81a940a9..b0c4e0fc9 100755
--- 
a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/task/Task.java
+++ 
b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/task/Task.java
@@ -32,6 +32,7 @@ public class Task {
     private final Sink sink;
     private final Channel channel;
     private final JobProfile jobConf;
+    private volatile boolean isInited = false;
 
     public Task(String taskId, Reader reader, Sink sink, Channel channel,
             JobProfile jobConf) {
@@ -70,6 +71,11 @@ public class Task {
         this.channel.init(jobConf);
         this.sink.init(jobConf);
         this.reader.init(jobConf);
+        isInited = true;
+    }
+
+    public boolean isTaskFinishInit() {
+        return isInited;
     }
 
     public void destroy() {
diff --git 
a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/task/TaskManager.java
 
b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/task/TaskManager.java
index a08f8f0a2..4c3fc10e7 100755
--- 
a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/task/TaskManager.java
+++ 
b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/task/TaskManager.java
@@ -201,6 +201,9 @@ public class TaskManager extends AbstractDaemon {
      * @param taskId task id
      */
     public void removeTask(String taskId) {
+        if (taskId == null) {
+            return;
+        }
         getTaskMetrics().taskRunningCount.decrementAndGet();
         TaskWrapper taskWrapper = tasks.remove(taskId);
         if (taskWrapper != null) {
diff --git 
a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/task/TaskPositionManager.java
 
b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/task/TaskPositionManager.java
index 1d0c86657..06951aab6 100644
--- 
a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/task/TaskPositionManager.java
+++ 
b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/task/TaskPositionManager.java
@@ -22,6 +22,7 @@ import org.apache.inlong.agent.conf.AgentConfiguration;
 import org.apache.inlong.agent.conf.JobProfile;
 import org.apache.inlong.agent.core.AgentManager;
 import org.apache.inlong.agent.db.JobProfileDb;
+import org.apache.inlong.agent.message.BatchProxyMessage;
 import org.apache.inlong.agent.utils.ThreadUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -57,7 +58,7 @@ public class TaskPositionManager extends AbstractDaemon {
     /**
      * task position manager singleton, can only generated by agent manager
      */
-    public static TaskPositionManager getTaskPositionManager(AgentManager 
agentManager) {
+    public static TaskPositionManager getInstance(AgentManager agentManager) {
         if (taskPositionManager == null) {
             synchronized (TaskPositionManager.class) {
                 if (taskPositionManager == null) {
@@ -71,7 +72,7 @@ public class TaskPositionManager extends AbstractDaemon {
     /**
      * get taskPositionManager singleton
      */
-    public static TaskPositionManager getTaskPositionManager() {
+    public static TaskPositionManager getInstance() {
         if (taskPositionManager == null) {
             throw new RuntimeException("task position manager has not been 
initialized by agentManager");
         }
@@ -134,11 +135,11 @@ public class TaskPositionManager extends AbstractDaemon {
      *
      * @param size add this size to beforePosition
      */
-    public void updateSinkPosition(String jobInstanceId, String sourcePath, 
long size) {
+    public void updateSinkPosition(BatchProxyMessage batchMsg, String 
sourcePath, long size) {
         ConcurrentHashMap<String, Long> positionTemp = new 
ConcurrentHashMap<>();
-        ConcurrentHashMap<String, Long> position = 
jobTaskPositionMap.putIfAbsent(jobInstanceId, positionTemp);
+        ConcurrentHashMap<String, Long> position = 
jobTaskPositionMap.putIfAbsent(batchMsg.getJobId(), positionTemp);
         if (position == null) {
-            JobProfile jobProfile = jobConfDb.getJobById(jobInstanceId);
+            JobProfile jobProfile = jobConfDb.getJobById(batchMsg.getJobId());
             positionTemp.put(sourcePath, jobProfile.getLong(sourcePath + 
POSITION_SUFFIX, 0));
             position = positionTemp;
         }
diff --git 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/fetcher/ManagerFetcher.java
 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/fetcher/ManagerFetcher.java
index 1ea04ca6a..703574c68 100755
--- 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/fetcher/ManagerFetcher.java
+++ 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/fetcher/ManagerFetcher.java
@@ -469,8 +469,8 @@ public class ManagerFetcher extends AbstractDaemon 
implements ProfileFetcher {
      */
     private String confirmLocalIps(List<String> localIps) {
         ConfirmAgentIpRequest request = new ConfirmAgentIpRequest(AGENT, 
localIps);
-        JsonObject resultData = 
getResultData(httpManager.doSentPost(managerIpsCheckUrl, request))
-                .get(AGENT_MANAGER_RETURN_PARAM_DATA).getAsJsonObject();
+        JsonObject resultData = 
getResultData(httpManager.doSentPost(managerIpsCheckUrl, request)).get(
+                AGENT_MANAGER_RETURN_PARAM_DATA).getAsJsonObject();
         if (!resultData.has(AGENT_MANAGER_RETURN_PARAM_IP)) {
             throw new IllegalArgumentException("cannot get ip from data " + 
resultData.getAsString());
         }
diff --git 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/AbstractSink.java
 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/AbstractSink.java
index 8b6858f78..036463dd1 100644
--- 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/AbstractSink.java
+++ 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/AbstractSink.java
@@ -18,11 +18,11 @@
 package org.apache.inlong.agent.plugin.sinks;
 
 import org.apache.inlong.agent.conf.JobProfile;
+import org.apache.inlong.agent.message.PackProxyMessage;
 import org.apache.inlong.agent.metrics.AgentMetricItem;
 import org.apache.inlong.agent.metrics.AgentMetricItemSet;
 import org.apache.inlong.agent.plugin.MessageFilter;
 import org.apache.inlong.agent.plugin.Sink;
-import org.apache.inlong.agent.plugin.message.PackProxyMessage;
 import org.apache.inlong.common.metric.MetricRegister;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
diff --git 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/ProxySink.java
 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/ProxySink.java
index f8eff052d..5050e39dd 100755
--- 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/ProxySink.java
+++ 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/ProxySink.java
@@ -22,10 +22,10 @@ import org.apache.inlong.agent.conf.JobProfile;
 import org.apache.inlong.agent.constant.CommonConstants;
 import org.apache.inlong.agent.message.BatchProxyMessage;
 import org.apache.inlong.agent.message.EndMessage;
+import org.apache.inlong.agent.message.PackProxyMessage;
 import org.apache.inlong.agent.message.ProxyMessage;
 import org.apache.inlong.agent.plugin.Message;
 import org.apache.inlong.agent.plugin.MessageFilter;
-import org.apache.inlong.agent.plugin.message.PackProxyMessage;
 import org.apache.inlong.agent.utils.AgentUtils;
 import org.apache.inlong.agent.utils.ThreadUtils;
 import org.slf4j.Logger;
@@ -67,7 +67,7 @@ public class ProxySink extends AbstractSink {
                 message.getHeader().put(CommonConstants.PROXY_KEY_STREAM_ID, 
inlongStreamId);
                 extractStreamFromMessage(message, fieldSplitter);
                 if (!(message instanceof EndMessage)) {
-                    ProxyMessage proxyMessage = ProxyMessage.parse(message);
+                    ProxyMessage proxyMessage = new ProxyMessage(message);
                     // add proxy message to cache.
                     cache.compute(proxyMessage.getBatchKey(),
                             (s, packProxyMessage) -> {
diff --git 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/PulsarSink.java
 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/PulsarSink.java
index 43b88e423..031ee73e0 100644
--- 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/PulsarSink.java
+++ 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/PulsarSink.java
@@ -27,10 +27,10 @@ import org.apache.inlong.agent.conf.JobProfile;
 import org.apache.inlong.agent.core.task.TaskPositionManager;
 import org.apache.inlong.agent.message.BatchProxyMessage;
 import org.apache.inlong.agent.message.EndMessage;
+import org.apache.inlong.agent.message.PackProxyMessage;
 import org.apache.inlong.agent.message.ProxyMessage;
 import org.apache.inlong.agent.metrics.audit.AuditUtils;
 import org.apache.inlong.agent.plugin.Message;
-import org.apache.inlong.agent.plugin.message.PackProxyMessage;
 import org.apache.inlong.agent.utils.AgentUtils;
 import org.apache.inlong.agent.utils.ThreadUtils;
 import org.apache.inlong.common.msg.InLongMsg;
@@ -118,7 +118,7 @@ public class PulsarSink extends AbstractSink {
     @Override
     public void init(JobProfile jobConf) {
         super.init(jobConf);
-        taskPositionManager = TaskPositionManager.getTaskPositionManager();
+        taskPositionManager = TaskPositionManager.getInstance();
         // agentConf
         sendQueueSize = agentConf.getInt(PULSAR_SINK_SEND_QUEUE_SIZE, 
DEFAULT_SEND_QUEUE_SIZE);
         sendQueueSemaphore = new Semaphore(sendQueueSize);
@@ -159,7 +159,7 @@ public class PulsarSink extends AbstractSink {
         try {
             if (message != null) {
                 if (!(message instanceof EndMessage)) {
-                    ProxyMessage proxyMessage = ProxyMessage.parse(message);
+                    ProxyMessage proxyMessage = new ProxyMessage(message);
                     // add proxy message to cache.
                     cache.compute(proxyMessage.getBatchKey(),
                             (s, packProxyMessage) -> {
@@ -319,7 +319,7 @@ public class PulsarSink extends AbstractSink {
                 batchMsg.getTotalSize());
         sinkMetric.pluginSendSuccessCount.addAndGet(batchMsg.getMsgCnt());
         if (sourceName != null) {
-            taskPositionManager.updateSinkPosition(batchMsg.getJobId(), 
sourceName, batchMsg.getMsgCnt());
+            taskPositionManager.updateSinkPosition(batchMsg, sourceName, 
batchMsg.getMsgCnt());
         }
     }
 
diff --git 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/SenderManager.java
 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/SenderManager.java
index e64b8b543..a58ac78e1 100755
--- 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/SenderManager.java
+++ 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/SenderManager.java
@@ -131,7 +131,7 @@ public class SenderManager {
         retrySleepTime = jobConf.getLong(
                 CommonConstants.PROXY_RETRY_SLEEP, 
CommonConstants.DEFAULT_PROXY_RETRY_SLEEP);
         isFile = jobConf.getBoolean(CommonConstants.PROXY_IS_FILE, 
CommonConstants.DEFAULT_IS_FILE);
-        taskPositionManager = TaskPositionManager.getTaskPositionManager();
+        taskPositionManager = TaskPositionManager.getInstance();
         semaphore = new 
Semaphore(jobConf.getInt(CommonConstants.PROXY_MESSAGE_SEMAPHORE,
                 CommonConstants.DEFAULT_PROXY_MESSAGE_SEMAPHORE));
         ioThreadNum = 
jobConf.getInt(CommonConstants.PROXY_CLIENT_IO_THREAD_NUM,
@@ -161,7 +161,6 @@ public class SenderManager {
 
     private AgentMetricItem getMetricItem(String groupId, String streamId) {
         Map<String, String> dims = new HashMap<>();
-        dims.put(KEY_PLUGIN_ID, this.getClass().getSimpleName());
         dims.put(KEY_INLONG_GROUP_ID, groupId);
         dims.put(KEY_INLONG_STREAM_ID, streamId);
         return getMetricItem(dims);
@@ -288,7 +287,7 @@ public class SenderManager {
                 AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_SEND_SUCCESS, 
groupId, streamId, dataTime, msgCnt,
                         batchMessage.getTotalSize());
                 if (sourcePath != null) {
-                    
taskPositionManager.updateSinkPosition(batchMessage.getJobId(), sourcePath, 
msgCnt);
+                    taskPositionManager.updateSinkPosition(batchMessage, 
sourcePath, msgCnt);
                 }
             } else {
                 metricItem.pluginSendFailCount.addAndGet(msgCnt);
@@ -343,7 +342,7 @@ public class SenderManager {
                     batchMessage.getTotalSize());
             getMetricItem(groupId, 
streamId).pluginSendSuccessCount.addAndGet(msgCnt);
             if (sourcePath != null) {
-                taskPositionManager.updateSinkPosition(jobId, sourcePath, 
msgCnt);
+                taskPositionManager.updateSinkPosition(batchMessage, 
sourcePath, msgCnt);
             }
         }
 
diff --git 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/AbstractReader.java
 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/AbstractReader.java
index 6d14eb10c..53cda8752 100644
--- 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/AbstractReader.java
+++ 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/AbstractReader.java
@@ -47,6 +47,7 @@ import static 
org.apache.inlong.agent.metrics.AgentMetricItem.KEY_PLUGIN_ID;
 public abstract class AbstractReader implements Reader {
 
     protected static final AtomicLong METRIC_INDEX = new AtomicLong(0);
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(AbstractReader.class);
     protected String inlongGroupId;
     protected String inlongStreamId;
     // metric
@@ -55,8 +56,6 @@ public abstract class AbstractReader implements Reader {
     protected String metricName;
     protected Map<String, String> dimensions;
 
-    private static final Logger LOGGER = 
LoggerFactory.getLogger(AbstractReader.class);
-
     @Override
     public void init(JobProfile jobConf) {
         inlongGroupId = jobConf.get(PROXY_INLONG_GROUP_ID, 
DEFAULT_PROXY_INLONG_GROUP_ID);
diff --git 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/file/FileReaderOperator.java
 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/file/FileReaderOperator.java
index 33e8816cc..076f617b8 100644
--- 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/file/FileReaderOperator.java
+++ 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/file/FileReaderOperator.java
@@ -24,7 +24,7 @@ import org.apache.inlong.agent.message.DefaultMessage;
 import org.apache.inlong.agent.metrics.audit.AuditUtils;
 import org.apache.inlong.agent.plugin.Message;
 import org.apache.inlong.agent.plugin.Validator;
-import org.apache.inlong.agent.plugin.except.FileException;
+import org.apache.inlong.agent.except.FileException;
 import org.apache.inlong.agent.plugin.sources.reader.AbstractReader;
 import org.apache.inlong.agent.plugin.validator.PatternValidator;
 import org.apache.inlong.agent.utils.AgentUtils;
diff --git 
a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sinks/MockSink.java
 
b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sinks/MockSink.java
index bb303107b..9fa5dc80f 100644
--- 
a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sinks/MockSink.java
+++ 
b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sinks/MockSink.java
@@ -19,6 +19,7 @@ package org.apache.inlong.agent.plugin.sinks;
 
 import org.apache.inlong.agent.conf.JobProfile;
 import org.apache.inlong.agent.core.task.TaskPositionManager;
+import org.apache.inlong.agent.message.BatchProxyMessage;
 import org.apache.inlong.agent.plugin.Message;
 import org.apache.inlong.agent.plugin.MessageFilter;
 import org.apache.inlong.agent.utils.AgentUtils;
@@ -50,7 +51,9 @@ public class MockSink extends AbstractSink {
     public void write(Message message) {
         if (message != null) {
             number.incrementAndGet();
-            taskPositionManager.updateSinkPosition(jobInstanceId, 
sourceFileName, 1);
+            BatchProxyMessage msg = new BatchProxyMessage();
+            msg.setJobId(jobInstanceId);
+            taskPositionManager.updateSinkPosition(msg, sourceFileName, 1);
             // increment the count of successful sinks
             sinkMetric.sinkSuccessCount.incrementAndGet();
         } else {
@@ -71,7 +74,7 @@ public class MockSink extends AbstractSink {
 
     @Override
     public void init(JobProfile jobConf) {
-        taskPositionManager = TaskPositionManager.getTaskPositionManager();
+        taskPositionManager = TaskPositionManager.getInstance();
         jobInstanceId = jobConf.get(JOB_INSTANCE_ID);
         dataTime = 
AgentUtils.timeStrConvertToMillSec(jobConf.get(JOB_DATA_TIME, ""),
                 jobConf.get(JOB_CYCLE_UNIT, ""));


Reply via email to