This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new f8faf2b37 [INLONG-6759][Agent] Improve code style (#6760)
f8faf2b37 is described below
commit f8faf2b379b37e5a3f37e0ffb7fd52e8fa2b6ce7
Author: xueyingzhang <[email protected]>
AuthorDate: Tue Dec 6 14:59:36 2022 +0800
[INLONG-6759][Agent] Improve code style (#6760)
---
.../apache/inlong/agent/common/AbstractDaemon.java | 2 +-
.../inlong/agent/conf/AbstractConfiguration.java | 5 ++
.../inlong/agent/constant/CommandConstants.java | 57 ----------------------
.../apache/inlong/agent}/except/FileException.java | 2 +-
.../inlong/agent/message/DefaultMessage.java | 2 +-
.../inlong/agent}/message/PackProxyMessage.java | 4 +-
.../apache/inlong/agent/message/ProxyMessage.java | 12 ++---
.../org/apache/inlong/agent/utils/HttpManager.java | 16 ++++++
.../org/apache/inlong/agent/core/AgentManager.java | 6 +--
.../apache/inlong/agent/core/HeartbeatManager.java | 41 ++++++++--------
.../java/org/apache/inlong/agent/core/job/Job.java | 1 -
.../apache/inlong/agent/core/job/JobManager.java | 20 ++++----
.../org/apache/inlong/agent/core/task/Task.java | 6 +++
.../apache/inlong/agent/core/task/TaskManager.java | 3 ++
.../agent/core/task/TaskPositionManager.java | 11 +++--
.../agent/plugin/fetcher/ManagerFetcher.java | 4 +-
.../inlong/agent/plugin/sinks/AbstractSink.java | 2 +-
.../inlong/agent/plugin/sinks/ProxySink.java | 4 +-
.../inlong/agent/plugin/sinks/PulsarSink.java | 8 +--
.../inlong/agent/plugin/sinks/SenderManager.java | 7 ++-
.../plugin/sources/reader/AbstractReader.java | 3 +-
.../sources/reader/file/FileReaderOperator.java | 2 +-
.../apache/inlong/agent/plugin/sinks/MockSink.java | 7 ++-
23 files changed, 96 insertions(+), 129 deletions(-)
diff --git
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/common/AbstractDaemon.java
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/common/AbstractDaemon.java
index 653720d14..450606154 100644
---
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/common/AbstractDaemon.java
+++
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/common/AbstractDaemon.java
@@ -44,7 +44,7 @@ public abstract class AbstractDaemon implements Service {
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(), new
AgentThreadFactory("AbstractDaemon"));
private final List<CompletableFuture<?>> workerFutures;
- private boolean runnable = true;
+ private volatile boolean runnable = true;
public AbstractDaemon() {
this.workerFutures = new ArrayList<>();
diff --git
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/conf/AbstractConfiguration.java
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/conf/AbstractConfiguration.java
index 46b67dd7d..62403b982 100644
---
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/conf/AbstractConfiguration.java
+++
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/conf/AbstractConfiguration.java
@@ -202,6 +202,11 @@ public abstract class AbstractConfiguration {
return value == null ? defaultValue : value.getAsBoolean();
}
+ public float getFloat(String key, float defaultValue) {
+ JsonElement value = configStorage.get(key);
+ return value == null ? defaultValue : value.getAsFloat();
+ }
+
/**
* get string
*
diff --git
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/CommandConstants.java
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/CommandConstants.java
deleted file mode 100644
index 31a9c3ec7..000000000
---
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/CommandConstants.java
+++ /dev/null
@@ -1,57 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.agent.constant;
-
-/**
- * old version of command result
- */
-public class CommandConstants {
-
- public static final String ID = "id";
-
- public static final String IDC = "idc";
- public static final String IP = "ip";
- public static final String OP = "op";
- public static final String OUTPUT_TYPE = "output_type";
- public static final String DATA_NAME = "data_name";
- public static final String CHECK_NAME = "check_name";
- public static final String HAVING_CHECK = "havingcheck";
- public static final String USING_VERIFY = "usingverify";
- public static final String ADDICTIVE_ATTR = "addictive_attr";
- public static final String TIME_OFFSET = "time_offset";
- public static final String SCHEDULE_TIME = "schedule_time";
- public static final String SPECIFIED_DATA_TIME = "specified_data_time";
- public static final String TASK_TYPE = "task_type";
- public static final String MAX_FILE_NUM = "max_file_num";
- public static final String TOPIC = "topic";
- public static final String SEQUENCIAL = "sequencial";
- public static final String DELAY_MILL_SEC = "delay_mill_sec";
- public static final String STOP_TIME_MILL_SEC = "stop_time_mill_sec";
- public static final String SCAN_INTERVAL_SEC = "scan_interval_sec";
- public static final String SCAN_DURA_SEC = "scan_dura_sec";
- public static final String CYCLE_UNIT = "cycle_unit";
- public static final String DRIVER_CLASS_NAME = "driver_class_name";
- public static final String THREAD_CLASS_NAME = "thread_class_name";
- public static final String FILTER_CLASS_NAME = "filter_class_name";
- public static final String CLUSTER_ID = "clusterId";
- public static final String BUSINESS_ID = "business_id";
- public static final String INTERFACE_ID = "interface_id";
- public static final String FIELD_SPLITTER = "field_splitter";
- public static final String MD5 = "md5";
-
-}
diff --git
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/except/FileException.java
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/except/FileException.java
similarity index 95%
rename from
inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/except/FileException.java
rename to
inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/except/FileException.java
index e86c22308..0325820ab 100644
---
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/except/FileException.java
+++
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/except/FileException.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.inlong.agent.plugin.except;
+package org.apache.inlong.agent.except;
public class FileException extends RuntimeException {
diff --git
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/message/DefaultMessage.java
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/message/DefaultMessage.java
index fb34a5dc0..cbf17247c 100644
---
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/message/DefaultMessage.java
+++
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/message/DefaultMessage.java
@@ -30,7 +30,7 @@ import java.util.Map;
public class DefaultMessage implements Message {
private final byte[] body;
- private final Map<String, String> header;
+ protected final Map<String, String> header;
public DefaultMessage(byte[] body, Map<String, String> header) {
this.body = body;
diff --git
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/message/PackProxyMessage.java
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/message/PackProxyMessage.java
similarity index 97%
rename from
inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/message/PackProxyMessage.java
rename to
inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/message/PackProxyMessage.java
index d51ba2ae0..6eb54cd1c 100644
---
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/message/PackProxyMessage.java
+++
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/message/PackProxyMessage.java
@@ -15,11 +15,9 @@
* limitations under the License.
*/
-package org.apache.inlong.agent.plugin.message;
+package org.apache.inlong.agent.message;
import org.apache.inlong.agent.conf.JobProfile;
-import org.apache.inlong.agent.message.BatchProxyMessage;
-import org.apache.inlong.agent.message.ProxyMessage;
import org.apache.inlong.agent.utils.AgentUtils;
import org.apache.inlong.common.msg.AttributeConstants;
import org.slf4j.Logger;
diff --git
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/message/ProxyMessage.java
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/message/ProxyMessage.java
index f07d8ceba..d8c0e1d42 100644
---
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/message/ProxyMessage.java
+++
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/message/ProxyMessage.java
@@ -38,7 +38,7 @@ public class ProxyMessage implements Message {
private final String inlongStreamId;
// determine the group key when making batch
private final String batchKey;
- private String dataKey;
+ private final String dataKey;
public ProxyMessage(byte[] body, Map<String, String> header) {
this.body = body;
@@ -50,14 +50,8 @@ public class ProxyMessage implements Message {
this.batchKey = dataKey + inlongStreamId;
}
- /**
- * Transform Message to ProxyMessage
- *
- * @param message Message
- * @return ProxyMessage
- */
- public static ProxyMessage parse(Message message) {
- return new ProxyMessage(message.getBody(), message.getHeader());
+ public ProxyMessage(Message message) {
+ this(message.getBody(), message.getHeader());
}
public String getDataKey() {
diff --git
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/utils/HttpManager.java
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/utils/HttpManager.java
index 4bc1ccaa5..98f6e1b8b 100644
---
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/utils/HttpManager.java
+++
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/utils/HttpManager.java
@@ -39,7 +39,11 @@ import static
org.apache.inlong.agent.constant.FetcherConstants.AGENT_HTTP_SUCCE
import static
org.apache.inlong.agent.constant.FetcherConstants.AGENT_MANAGER_AUTH_SECRET_ID;
import static
org.apache.inlong.agent.constant.FetcherConstants.AGENT_MANAGER_AUTH_SECRET_KEY;
import static
org.apache.inlong.agent.constant.FetcherConstants.AGENT_MANAGER_REQUEST_TIMEOUT;
+import static
org.apache.inlong.agent.constant.FetcherConstants.AGENT_MANAGER_VIP_HTTP_HOST;
+import static
org.apache.inlong.agent.constant.FetcherConstants.AGENT_MANAGER_VIP_HTTP_PORT;
+import static
org.apache.inlong.agent.constant.FetcherConstants.AGENT_MANAGER_VIP_HTTP_PREFIX_PATH;
import static
org.apache.inlong.agent.constant.FetcherConstants.DEFAULT_AGENT_MANAGER_REQUEST_TIMEOUT;
+import static
org.apache.inlong.agent.constant.FetcherConstants.DEFAULT_AGENT_MANAGER_VIP_HTTP_PREFIX_PATH;
/**
* Perform http operation
@@ -48,6 +52,7 @@ public class HttpManager {
private static final Logger LOGGER =
LoggerFactory.getLogger(HttpManager.class);
private static final Gson gson;
+ private static final AgentConfiguration agentConf =
AgentConfiguration.getAgentConf();
static {
final GsonBuilder gsonBuilder = new
GsonBuilder().setDateFormat("yyyy-MM-dd HH:mm:ss");
@@ -65,6 +70,17 @@ public class HttpManager {
secretKey = conf.get(AGENT_MANAGER_AUTH_SECRET_KEY);
}
+ /**
+ * build base url for manager according to config
+ *
+ * example - http://127.0.0.1:8080/inlong/manager/openapi
+ */
+ public static String buildBaseUrl() {
+ return "http://" + agentConf.get(AGENT_MANAGER_VIP_HTTP_HOST)
+ + ":" + agentConf.get(AGENT_MANAGER_VIP_HTTP_PORT)
+ + agentConf.get(AGENT_MANAGER_VIP_HTTP_PREFIX_PATH,
DEFAULT_AGENT_MANAGER_VIP_HTTP_PREFIX_PATH);
+ }
+
/**
* construct http client
*
diff --git
a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/AgentManager.java
b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/AgentManager.java
index 1716cf050..2a1231e34 100755
---
a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/AgentManager.java
+++
b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/AgentManager.java
@@ -71,12 +71,12 @@ public class AgentManager extends AbstractDaemon {
jobProfileDb = new JobProfileDb(db);
String parentConfPath = conf.get(AGENT_CONF_PARENT,
DEFAULT_AGENT_CONF_PARENT);
localProfile = new LocalProfile(parentConfPath);
- fetcher = initFetcher(this);
triggerManager = new TriggerManager(this, new TriggerProfileDb(db));
jobManager = new JobManager(this, jobProfileDb);
taskManager = new TaskManager(this);
- heartbeatManager = new HeartbeatManager(this);
- taskPositionManager = TaskPositionManager.getTaskPositionManager(this);
+ fetcher = initFetcher(this);
+ heartbeatManager = HeartbeatManager.getInstance(this);
+ taskPositionManager = TaskPositionManager.getInstance(this);
// need to be an option.
if (conf.getBoolean(
AgentConstants.AGENT_ENABLE_HTTP,
AgentConstants.DEFAULT_AGENT_ENABLE_HTTP)) {
diff --git
a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/HeartbeatManager.java
b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/HeartbeatManager.java
index ffae9eb4d..5eadee6e6 100644
---
a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/HeartbeatManager.java
+++
b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/HeartbeatManager.java
@@ -54,13 +54,9 @@ import static
org.apache.inlong.agent.constant.AgentConstants.DEFAULT_AGENT_HTTP
import static
org.apache.inlong.agent.constant.FetcherConstants.AGENT_HEARTBEAT_INTERVAL;
import static
org.apache.inlong.agent.constant.FetcherConstants.AGENT_MANAGER_HEARTBEAT_HTTP_PATH;
import static
org.apache.inlong.agent.constant.FetcherConstants.AGENT_MANAGER_REPORTSNAPSHOT_HTTP_PATH;
-import static
org.apache.inlong.agent.constant.FetcherConstants.AGENT_MANAGER_VIP_HTTP_HOST;
-import static
org.apache.inlong.agent.constant.FetcherConstants.AGENT_MANAGER_VIP_HTTP_PORT;
-import static
org.apache.inlong.agent.constant.FetcherConstants.AGENT_MANAGER_VIP_HTTP_PREFIX_PATH;
import static
org.apache.inlong.agent.constant.FetcherConstants.DEFAULT_AGENT_HEARTBEAT_INTERVAL;
import static
org.apache.inlong.agent.constant.FetcherConstants.DEFAULT_AGENT_MANAGER_HEARTBEAT_HTTP_PATH;
import static
org.apache.inlong.agent.constant.FetcherConstants.DEFAULT_AGENT_MANAGER_REPORTSNAPSHOT_HTTP_PATH;
-import static
org.apache.inlong.agent.constant.FetcherConstants.DEFAULT_AGENT_MANAGER_VIP_HTTP_PREFIX_PATH;
import static org.apache.inlong.agent.constant.JobConstants.JOB_GROUP_ID;
import static org.apache.inlong.agent.constant.JobConstants.JOB_STREAM_ID;
@@ -70,8 +66,7 @@ import static
org.apache.inlong.agent.constant.JobConstants.JOB_STREAM_ID;
public class HeartbeatManager extends AbstractDaemon implements
AbstractHeartbeatManager {
private static final Logger LOGGER =
LoggerFactory.getLogger(HeartbeatManager.class);
-
- private final AgentManager agentManager;
+ private static HeartbeatManager heartbeatManager = null;
private final JobManager jobmanager;
private final AgentConfiguration conf;
private final HttpManager httpManager;
@@ -83,16 +78,33 @@ public class HeartbeatManager extends AbstractDaemon
implements AbstractHeartbea
/**
* Init heartbeat manager.
*/
- public HeartbeatManager(AgentManager agentManager) {
+ private HeartbeatManager(AgentManager agentManager) {
this.conf = AgentConfiguration.getAgentConf();
- this.agentManager = agentManager;
jobmanager = agentManager.getJobManager();
httpManager = new HttpManager(conf);
- baseManagerUrl = buildBaseUrl();
+ baseManagerUrl = HttpManager.buildBaseUrl();
reportSnapshotUrl = buildReportSnapShotUrl(baseManagerUrl);
reportHeartbeatUrl = buildReportHeartbeatUrl(baseManagerUrl);
}
+ public static HeartbeatManager getInstance(AgentManager agentManager) {
+ if (heartbeatManager == null) {
+ synchronized (HeartbeatManager.class) {
+ if (heartbeatManager == null) {
+ heartbeatManager = new HeartbeatManager(agentManager);
+ }
+ }
+ }
+ return heartbeatManager;
+ }
+
+ public static HeartbeatManager getInstance() {
+ if (heartbeatManager == null) {
+ throw new RuntimeException("HeartbeatManager has not been
initialized by agentManager");
+ }
+ return heartbeatManager;
+ }
+
@Override
public void start() throws Exception {
submitWorker(snapshotReportThread());
@@ -226,17 +238,6 @@ public class HeartbeatManager extends AbstractDaemon
implements AbstractHeartbea
return heartbeatMsg;
}
- /**
- * build base url for manager according to config
- *
- * example - http://127.0.0.1:8080/inlong/manager/openapi
- */
- private String buildBaseUrl() {
- return "http://" + conf.get(AGENT_MANAGER_VIP_HTTP_HOST)
- + ":" + conf.get(AGENT_MANAGER_VIP_HTTP_PORT)
- + conf.get(AGENT_MANAGER_VIP_HTTP_PREFIX_PATH,
DEFAULT_AGENT_MANAGER_VIP_HTTP_PREFIX_PATH);
- }
-
private String buildReportSnapShotUrl(String baseUrl) {
return baseUrl
+ conf.get(AGENT_MANAGER_REPORTSNAPSHOT_HTTP_PATH,
DEFAULT_AGENT_MANAGER_REPORTSNAPSHOT_HTTP_PATH);
diff --git
a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/job/Job.java
b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/job/Job.java
index 125fd7897..35c3f6695 100644
---
a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/job/Job.java
+++
b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/job/Job.java
@@ -45,7 +45,6 @@ public class Job {
// job description
private String description;
protected String jobInstanceId;
- protected List<Task> taskList = new ArrayList<>();
protected ThreadLocal<Integer> threadNum = new ThreadLocal<Integer>() {
protected Integer initialValue() {
diff --git
a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/job/JobManager.java
b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/job/JobManager.java
index 42dce296e..31792616c 100644
---
a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/job/JobManager.java
+++
b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/job/JobManager.java
@@ -79,6 +79,8 @@ public class JobManager extends AbstractDaemon {
private final AgentMetricItemSet jobMetrics;
private final Map<String, String> dimensions;
+ private final AgentConfiguration agentConf;
+
/**
* init job manager
*
@@ -88,20 +90,18 @@ public class JobManager extends AbstractDaemon {
this.jobProfileDb = jobProfileDb;
this.agentManager = agentManager;
// job thread pool for running
- this.runningPool = new ThreadPoolExecutor(
- 0, Integer.MAX_VALUE,
- 60L, TimeUnit.SECONDS,
- new SynchronousQueue<>(),
- new AgentThreadFactory("job"));
+ this.runningPool = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L,
TimeUnit.SECONDS,
+ new SynchronousQueue<>(), new AgentThreadFactory("job"));
this.jobs = new ConcurrentHashMap<>();
+ this.agentConf = AgentConfiguration.getAgentConf();
this.pendingJobs = new ConcurrentHashMap<>();
- AgentConfiguration conf = AgentConfiguration.getAgentConf();
- this.monitorInterval = conf
+ this.monitorInterval = agentConf
.getInt(
AgentConstants.JOB_MONITOR_INTERVAL,
AgentConstants.DEFAULT_JOB_MONITOR_INTERVAL);
- this.jobDbCacheTime = conf.getLong(JOB_DB_CACHE_TIME,
DEFAULT_JOB_DB_CACHE_TIME);
- this.jobDbCacheCheckInterval =
conf.getLong(JOB_DB_CACHE_CHECK_INTERVAL, DEFAULT_JOB_DB_CACHE_CHECK_INTERVAL);
- this.jobMaxSize = conf.getLong(JOB_NUMBER_LIMIT,
DEFAULT_JOB_NUMBER_LIMIT);
+ this.jobDbCacheTime = agentConf.getLong(JOB_DB_CACHE_TIME,
DEFAULT_JOB_DB_CACHE_TIME);
+ this.jobDbCacheCheckInterval =
agentConf.getLong(JOB_DB_CACHE_CHECK_INTERVAL,
+ DEFAULT_JOB_DB_CACHE_CHECK_INTERVAL);
+ this.jobMaxSize = agentConf.getLong(JOB_NUMBER_LIMIT,
DEFAULT_JOB_NUMBER_LIMIT);
this.dimensions = new HashMap<>();
this.dimensions.put(KEY_COMPONENT_NAME,
this.getClass().getSimpleName());
diff --git
a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/task/Task.java
b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/task/Task.java
index a81a940a9..b0c4e0fc9 100755
---
a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/task/Task.java
+++
b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/task/Task.java
@@ -32,6 +32,7 @@ public class Task {
private final Sink sink;
private final Channel channel;
private final JobProfile jobConf;
+ private volatile boolean isInited = false;
public Task(String taskId, Reader reader, Sink sink, Channel channel,
JobProfile jobConf) {
@@ -70,6 +71,11 @@ public class Task {
this.channel.init(jobConf);
this.sink.init(jobConf);
this.reader.init(jobConf);
+ isInited = true;
+ }
+
+ public boolean isTaskFinishInit() {
+ return isInited;
}
public void destroy() {
diff --git
a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/task/TaskManager.java
b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/task/TaskManager.java
index a08f8f0a2..4c3fc10e7 100755
---
a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/task/TaskManager.java
+++
b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/task/TaskManager.java
@@ -201,6 +201,9 @@ public class TaskManager extends AbstractDaemon {
* @param taskId task id
*/
public void removeTask(String taskId) {
+ if (taskId == null) {
+ return;
+ }
getTaskMetrics().taskRunningCount.decrementAndGet();
TaskWrapper taskWrapper = tasks.remove(taskId);
if (taskWrapper != null) {
diff --git
a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/task/TaskPositionManager.java
b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/task/TaskPositionManager.java
index 1d0c86657..06951aab6 100644
---
a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/task/TaskPositionManager.java
+++
b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/task/TaskPositionManager.java
@@ -22,6 +22,7 @@ import org.apache.inlong.agent.conf.AgentConfiguration;
import org.apache.inlong.agent.conf.JobProfile;
import org.apache.inlong.agent.core.AgentManager;
import org.apache.inlong.agent.db.JobProfileDb;
+import org.apache.inlong.agent.message.BatchProxyMessage;
import org.apache.inlong.agent.utils.ThreadUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -57,7 +58,7 @@ public class TaskPositionManager extends AbstractDaemon {
/**
* task position manager singleton, can only generated by agent manager
*/
- public static TaskPositionManager getTaskPositionManager(AgentManager
agentManager) {
+ public static TaskPositionManager getInstance(AgentManager agentManager) {
if (taskPositionManager == null) {
synchronized (TaskPositionManager.class) {
if (taskPositionManager == null) {
@@ -71,7 +72,7 @@ public class TaskPositionManager extends AbstractDaemon {
/**
* get taskPositionManager singleton
*/
- public static TaskPositionManager getTaskPositionManager() {
+ public static TaskPositionManager getInstance() {
if (taskPositionManager == null) {
throw new RuntimeException("task position manager has not been
initialized by agentManager");
}
@@ -134,11 +135,11 @@ public class TaskPositionManager extends AbstractDaemon {
*
* @param size add this size to beforePosition
*/
- public void updateSinkPosition(String jobInstanceId, String sourcePath,
long size) {
+ public void updateSinkPosition(BatchProxyMessage batchMsg, String
sourcePath, long size) {
ConcurrentHashMap<String, Long> positionTemp = new
ConcurrentHashMap<>();
- ConcurrentHashMap<String, Long> position =
jobTaskPositionMap.putIfAbsent(jobInstanceId, positionTemp);
+ ConcurrentHashMap<String, Long> position =
jobTaskPositionMap.putIfAbsent(batchMsg.getJobId(), positionTemp);
if (position == null) {
- JobProfile jobProfile = jobConfDb.getJobById(jobInstanceId);
+ JobProfile jobProfile = jobConfDb.getJobById(batchMsg.getJobId());
positionTemp.put(sourcePath, jobProfile.getLong(sourcePath +
POSITION_SUFFIX, 0));
position = positionTemp;
}
diff --git
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/fetcher/ManagerFetcher.java
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/fetcher/ManagerFetcher.java
index 1ea04ca6a..703574c68 100755
---
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/fetcher/ManagerFetcher.java
+++
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/fetcher/ManagerFetcher.java
@@ -469,8 +469,8 @@ public class ManagerFetcher extends AbstractDaemon
implements ProfileFetcher {
*/
private String confirmLocalIps(List<String> localIps) {
ConfirmAgentIpRequest request = new ConfirmAgentIpRequest(AGENT,
localIps);
- JsonObject resultData =
getResultData(httpManager.doSentPost(managerIpsCheckUrl, request))
- .get(AGENT_MANAGER_RETURN_PARAM_DATA).getAsJsonObject();
+ JsonObject resultData =
getResultData(httpManager.doSentPost(managerIpsCheckUrl, request)).get(
+ AGENT_MANAGER_RETURN_PARAM_DATA).getAsJsonObject();
if (!resultData.has(AGENT_MANAGER_RETURN_PARAM_IP)) {
throw new IllegalArgumentException("cannot get ip from data " +
resultData.getAsString());
}
diff --git
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/AbstractSink.java
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/AbstractSink.java
index 8b6858f78..036463dd1 100644
---
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/AbstractSink.java
+++
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/AbstractSink.java
@@ -18,11 +18,11 @@
package org.apache.inlong.agent.plugin.sinks;
import org.apache.inlong.agent.conf.JobProfile;
+import org.apache.inlong.agent.message.PackProxyMessage;
import org.apache.inlong.agent.metrics.AgentMetricItem;
import org.apache.inlong.agent.metrics.AgentMetricItemSet;
import org.apache.inlong.agent.plugin.MessageFilter;
import org.apache.inlong.agent.plugin.Sink;
-import org.apache.inlong.agent.plugin.message.PackProxyMessage;
import org.apache.inlong.common.metric.MetricRegister;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
diff --git
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/ProxySink.java
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/ProxySink.java
index f8eff052d..5050e39dd 100755
---
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/ProxySink.java
+++
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/ProxySink.java
@@ -22,10 +22,10 @@ import org.apache.inlong.agent.conf.JobProfile;
import org.apache.inlong.agent.constant.CommonConstants;
import org.apache.inlong.agent.message.BatchProxyMessage;
import org.apache.inlong.agent.message.EndMessage;
+import org.apache.inlong.agent.message.PackProxyMessage;
import org.apache.inlong.agent.message.ProxyMessage;
import org.apache.inlong.agent.plugin.Message;
import org.apache.inlong.agent.plugin.MessageFilter;
-import org.apache.inlong.agent.plugin.message.PackProxyMessage;
import org.apache.inlong.agent.utils.AgentUtils;
import org.apache.inlong.agent.utils.ThreadUtils;
import org.slf4j.Logger;
@@ -67,7 +67,7 @@ public class ProxySink extends AbstractSink {
message.getHeader().put(CommonConstants.PROXY_KEY_STREAM_ID,
inlongStreamId);
extractStreamFromMessage(message, fieldSplitter);
if (!(message instanceof EndMessage)) {
- ProxyMessage proxyMessage = ProxyMessage.parse(message);
+ ProxyMessage proxyMessage = new ProxyMessage(message);
// add proxy message to cache.
cache.compute(proxyMessage.getBatchKey(),
(s, packProxyMessage) -> {
diff --git
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/PulsarSink.java
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/PulsarSink.java
index 43b88e423..031ee73e0 100644
---
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/PulsarSink.java
+++
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/PulsarSink.java
@@ -27,10 +27,10 @@ import org.apache.inlong.agent.conf.JobProfile;
import org.apache.inlong.agent.core.task.TaskPositionManager;
import org.apache.inlong.agent.message.BatchProxyMessage;
import org.apache.inlong.agent.message.EndMessage;
+import org.apache.inlong.agent.message.PackProxyMessage;
import org.apache.inlong.agent.message.ProxyMessage;
import org.apache.inlong.agent.metrics.audit.AuditUtils;
import org.apache.inlong.agent.plugin.Message;
-import org.apache.inlong.agent.plugin.message.PackProxyMessage;
import org.apache.inlong.agent.utils.AgentUtils;
import org.apache.inlong.agent.utils.ThreadUtils;
import org.apache.inlong.common.msg.InLongMsg;
@@ -118,7 +118,7 @@ public class PulsarSink extends AbstractSink {
@Override
public void init(JobProfile jobConf) {
super.init(jobConf);
- taskPositionManager = TaskPositionManager.getTaskPositionManager();
+ taskPositionManager = TaskPositionManager.getInstance();
// agentConf
sendQueueSize = agentConf.getInt(PULSAR_SINK_SEND_QUEUE_SIZE,
DEFAULT_SEND_QUEUE_SIZE);
sendQueueSemaphore = new Semaphore(sendQueueSize);
@@ -159,7 +159,7 @@ public class PulsarSink extends AbstractSink {
try {
if (message != null) {
if (!(message instanceof EndMessage)) {
- ProxyMessage proxyMessage = ProxyMessage.parse(message);
+ ProxyMessage proxyMessage = new ProxyMessage(message);
// add proxy message to cache.
cache.compute(proxyMessage.getBatchKey(),
(s, packProxyMessage) -> {
@@ -319,7 +319,7 @@ public class PulsarSink extends AbstractSink {
batchMsg.getTotalSize());
sinkMetric.pluginSendSuccessCount.addAndGet(batchMsg.getMsgCnt());
if (sourceName != null) {
- taskPositionManager.updateSinkPosition(batchMsg.getJobId(),
sourceName, batchMsg.getMsgCnt());
+ taskPositionManager.updateSinkPosition(batchMsg, sourceName,
batchMsg.getMsgCnt());
}
}
diff --git
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/SenderManager.java
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/SenderManager.java
index e64b8b543..a58ac78e1 100755
---
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/SenderManager.java
+++
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/SenderManager.java
@@ -131,7 +131,7 @@ public class SenderManager {
retrySleepTime = jobConf.getLong(
CommonConstants.PROXY_RETRY_SLEEP,
CommonConstants.DEFAULT_PROXY_RETRY_SLEEP);
isFile = jobConf.getBoolean(CommonConstants.PROXY_IS_FILE,
CommonConstants.DEFAULT_IS_FILE);
- taskPositionManager = TaskPositionManager.getTaskPositionManager();
+ taskPositionManager = TaskPositionManager.getInstance();
semaphore = new
Semaphore(jobConf.getInt(CommonConstants.PROXY_MESSAGE_SEMAPHORE,
CommonConstants.DEFAULT_PROXY_MESSAGE_SEMAPHORE));
ioThreadNum =
jobConf.getInt(CommonConstants.PROXY_CLIENT_IO_THREAD_NUM,
@@ -161,7 +161,6 @@ public class SenderManager {
private AgentMetricItem getMetricItem(String groupId, String streamId) {
Map<String, String> dims = new HashMap<>();
- dims.put(KEY_PLUGIN_ID, this.getClass().getSimpleName());
dims.put(KEY_INLONG_GROUP_ID, groupId);
dims.put(KEY_INLONG_STREAM_ID, streamId);
return getMetricItem(dims);
@@ -288,7 +287,7 @@ public class SenderManager {
AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_SEND_SUCCESS,
groupId, streamId, dataTime, msgCnt,
batchMessage.getTotalSize());
if (sourcePath != null) {
-
taskPositionManager.updateSinkPosition(batchMessage.getJobId(), sourcePath,
msgCnt);
+ taskPositionManager.updateSinkPosition(batchMessage,
sourcePath, msgCnt);
}
} else {
metricItem.pluginSendFailCount.addAndGet(msgCnt);
@@ -343,7 +342,7 @@ public class SenderManager {
batchMessage.getTotalSize());
getMetricItem(groupId,
streamId).pluginSendSuccessCount.addAndGet(msgCnt);
if (sourcePath != null) {
- taskPositionManager.updateSinkPosition(jobId, sourcePath,
msgCnt);
+ taskPositionManager.updateSinkPosition(batchMessage,
sourcePath, msgCnt);
}
}
diff --git
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/AbstractReader.java
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/AbstractReader.java
index 6d14eb10c..53cda8752 100644
---
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/AbstractReader.java
+++
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/AbstractReader.java
@@ -47,6 +47,7 @@ import static
org.apache.inlong.agent.metrics.AgentMetricItem.KEY_PLUGIN_ID;
public abstract class AbstractReader implements Reader {
protected static final AtomicLong METRIC_INDEX = new AtomicLong(0);
+ private static final Logger LOGGER =
LoggerFactory.getLogger(AbstractReader.class);
protected String inlongGroupId;
protected String inlongStreamId;
// metric
@@ -55,8 +56,6 @@ public abstract class AbstractReader implements Reader {
protected String metricName;
protected Map<String, String> dimensions;
- private static final Logger LOGGER =
LoggerFactory.getLogger(AbstractReader.class);
-
@Override
public void init(JobProfile jobConf) {
inlongGroupId = jobConf.get(PROXY_INLONG_GROUP_ID,
DEFAULT_PROXY_INLONG_GROUP_ID);
diff --git
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/file/FileReaderOperator.java
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/file/FileReaderOperator.java
index 33e8816cc..076f617b8 100644
---
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/file/FileReaderOperator.java
+++
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/file/FileReaderOperator.java
@@ -24,7 +24,7 @@ import org.apache.inlong.agent.message.DefaultMessage;
import org.apache.inlong.agent.metrics.audit.AuditUtils;
import org.apache.inlong.agent.plugin.Message;
import org.apache.inlong.agent.plugin.Validator;
-import org.apache.inlong.agent.plugin.except.FileException;
+import org.apache.inlong.agent.except.FileException;
import org.apache.inlong.agent.plugin.sources.reader.AbstractReader;
import org.apache.inlong.agent.plugin.validator.PatternValidator;
import org.apache.inlong.agent.utils.AgentUtils;
diff --git
a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sinks/MockSink.java
b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sinks/MockSink.java
index bb303107b..9fa5dc80f 100644
---
a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sinks/MockSink.java
+++
b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sinks/MockSink.java
@@ -19,6 +19,7 @@ package org.apache.inlong.agent.plugin.sinks;
import org.apache.inlong.agent.conf.JobProfile;
import org.apache.inlong.agent.core.task.TaskPositionManager;
+import org.apache.inlong.agent.message.BatchProxyMessage;
import org.apache.inlong.agent.plugin.Message;
import org.apache.inlong.agent.plugin.MessageFilter;
import org.apache.inlong.agent.utils.AgentUtils;
@@ -50,7 +51,9 @@ public class MockSink extends AbstractSink {
public void write(Message message) {
if (message != null) {
number.incrementAndGet();
- taskPositionManager.updateSinkPosition(jobInstanceId,
sourceFileName, 1);
+ BatchProxyMessage msg = new BatchProxyMessage();
+ msg.setJobId(jobInstanceId);
+ taskPositionManager.updateSinkPosition(msg, sourceFileName, 1);
// increment the count of successful sinks
sinkMetric.sinkSuccessCount.incrementAndGet();
} else {
@@ -71,7 +74,7 @@ public class MockSink extends AbstractSink {
@Override
public void init(JobProfile jobConf) {
- taskPositionManager = TaskPositionManager.getTaskPositionManager();
+ taskPositionManager = TaskPositionManager.getInstance();
jobInstanceId = jobConf.get(JOB_INSTANCE_ID);
dataTime =
AgentUtils.timeStrConvertToMillSec(jobConf.get(JOB_DATA_TIME, ""),
jobConf.get(JOB_CYCLE_UNIT, ""));