This is an automated email from the ASF dual-hosted git repository.
wenweihuang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 6c48c78dd3 [INLONG-10399][Agent] Add global configurations updater
(#10400)
6c48c78dd3 is described below
commit 6c48c78dd36d1119e1c40b1204ab9219c2ce1504
Author: justinwwhuang <[email protected]>
AuthorDate: Thu Jun 13 15:22:37 2024 +0800
[INLONG-10399][Agent] Add global configurations updater (#10400)
* [INLONG-10399][Agent] Add global configurations updater
* [INLONG-10399][Agent] Add global configurations updater
---
.../inlong/agent/constant/FetcherConstants.java | 5 +-
.../org/apache/inlong/agent/core/AgentManager.java | 19 +++++
.../agent/plugin/fetcher/ManagerFetcher.java | 83 +++++++++++++---------
.../common/pojo/agent/AgentConfigRequest.java | 1 +
4 files changed, 72 insertions(+), 36 deletions(-)
diff --git
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/FetcherConstants.java
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/FetcherConstants.java
index 727748ae05..00c77f3ca6 100644
---
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/FetcherConstants.java
+++
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/FetcherConstants.java
@@ -36,8 +36,9 @@ public class FetcherConstants {
public static final String DEFAULT_AGENT_MANAGER_VIP_HTTP_PREFIX_PATH =
"/inlong/manager/openapi";
public static final String AGENT_MANAGER_TASK_HTTP_PATH =
"agent.manager.task.http.path";
- public static final String DEFAULT_AGENT_MANAGER_TASK_HTTP_PATH =
"/agent/reportAndGetTask";
- public static final String DEFAULT_AGENT_MANAGER_CONFIG_HTTP_PATH =
"/agent/getExistTaskConfig";
+ public static final String DEFAULT_AGENT_MANAGER_EXIST_TASK_HTTP_PATH =
"/agent/getExistTaskConfig";
+ public static final String AGENT_MANAGER_CONFIG_HTTP_PATH =
"agent.manager.config.http.path";
+ public static final String DEFAULT_AGENT_MANAGER_CONFIG_HTTP_PATH =
"/agent/getConfig";
public static final String INSTALLER_MANAGER_CONFIG_HTTP_PATH =
"installer.manager.config.http.path";
public static final String DEFAULT_INSTALLER_MANAGER_CONFIG_HTTP_PATH =
"/installer/getConfig";
diff --git
a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/AgentManager.java
b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/AgentManager.java
index ca5247fe59..6e8deba0ae 100755
---
a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/AgentManager.java
+++
b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/AgentManager.java
@@ -22,14 +22,17 @@ import org.apache.inlong.agent.conf.AgentConfiguration;
import org.apache.inlong.agent.conf.ProfileFetcher;
import org.apache.inlong.agent.constant.AgentConstants;
import org.apache.inlong.agent.core.task.TaskManager;
+import org.apache.inlong.common.pojo.agent.AgentConfigInfo;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
import java.lang.reflect.Constructor;
+import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
/**
* Agent Manager, the bridge for task manager, task store e.t.c it manages
agent level operations and communicates
@@ -43,6 +46,8 @@ public class AgentManager extends AbstractDaemon {
private final ProfileFetcher fetcher;
private final AgentConfiguration conf;
private final ExecutorService agentConfMonitor;
+ public static final int CONFIG_QUEUE_CAPACITY = 2;
+ private static BlockingQueue<AgentConfigInfo> configQueue = new
LinkedBlockingQueue<>(CONFIG_QUEUE_CAPACITY);
public AgentManager() {
conf = AgentConfiguration.getAgentConf();
@@ -52,6 +57,20 @@ public class AgentManager extends AbstractDaemon {
heartbeatManager = HeartbeatManager.getInstance(this);
}
+ public static AgentConfigInfo getAgentConfigInfo() {
+ return configQueue.peek();
+ }
+
+ public void subNewAgentConfigInfo(AgentConfigInfo info) {
+ if (info == null) {
+ return;
+ }
+ if (configQueue.size() == CONFIG_QUEUE_CAPACITY) {
+ configQueue.poll();
+ }
+ configQueue.add(info);
+ }
+
/**
* init fetch by class name
*/
diff --git
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/fetcher/ManagerFetcher.java
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/fetcher/ManagerFetcher.java
index f5fad95d23..812e4c2513 100644
---
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/fetcher/ManagerFetcher.java
+++
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/fetcher/ManagerFetcher.java
@@ -26,8 +26,9 @@ import org.apache.inlong.agent.pojo.FileTask.FileTaskConfig;
import org.apache.inlong.agent.utils.AgentUtils;
import org.apache.inlong.agent.utils.HttpManager;
import org.apache.inlong.agent.utils.ThreadUtils;
-import org.apache.inlong.common.db.CommandEntity;
import org.apache.inlong.common.enums.PullJobTypeEnum;
+import org.apache.inlong.common.pojo.agent.AgentConfigInfo;
+import org.apache.inlong.common.pojo.agent.AgentConfigRequest;
import org.apache.inlong.common.pojo.agent.DataConfig;
import org.apache.inlong.common.pojo.agent.TaskRequest;
import org.apache.inlong.common.pojo.agent.TaskResult;
@@ -46,15 +47,17 @@ import java.util.Date;
import java.util.List;
import static
org.apache.inlong.agent.constant.AgentConstants.AGENT_CLUSTER_NAME;
+import static
org.apache.inlong.agent.constant.AgentConstants.AGENT_CLUSTER_TAG;
import static org.apache.inlong.agent.constant.AgentConstants.AGENT_UNIQ_ID;
import static
org.apache.inlong.agent.constant.AgentConstants.DEFAULT_AGENT_UNIQ_ID;
import static
org.apache.inlong.agent.constant.FetcherConstants.AGENT_FETCHER_INTERVAL;
import static
org.apache.inlong.agent.constant.FetcherConstants.AGENT_MANAGER_ADDR;
+import static
org.apache.inlong.agent.constant.FetcherConstants.AGENT_MANAGER_CONFIG_HTTP_PATH;
import static
org.apache.inlong.agent.constant.FetcherConstants.AGENT_MANAGER_RETURN_PARAM_DATA;
import static
org.apache.inlong.agent.constant.FetcherConstants.AGENT_MANAGER_TASK_HTTP_PATH;
import static
org.apache.inlong.agent.constant.FetcherConstants.DEFAULT_AGENT_FETCHER_INTERVAL;
import static
org.apache.inlong.agent.constant.FetcherConstants.DEFAULT_AGENT_MANAGER_CONFIG_HTTP_PATH;
-import static
org.apache.inlong.agent.constant.FetcherConstants.DEFAULT_AGENT_MANAGER_TASK_HTTP_PATH;
+import static
org.apache.inlong.agent.constant.FetcherConstants.DEFAULT_AGENT_MANAGER_EXIST_TASK_HTTP_PATH;
import static
org.apache.inlong.agent.plugin.fetcher.ManagerResultFormatter.getResultData;
import static org.apache.inlong.agent.utils.AgentUtils.fetchLocalIp;
import static org.apache.inlong.agent.utils.AgentUtils.fetchLocalUuid;
@@ -69,14 +72,15 @@ public class ManagerFetcher extends AbstractDaemon
implements ProfileFetcher {
private static final GsonBuilder gsonBuilder = new
GsonBuilder().setDateFormat("yyyy-MM-dd HH:mm:ss");
private static final Gson GSON = gsonBuilder.create();
private final String baseManagerUrl;
- private final String taskConfigUrl;
private final String staticConfigUrl;
+ private final String agentConfigInfoUrl;
private final AgentConfiguration conf;
private final String uniqId;
private final AgentManager agentManager;
private final HttpManager httpManager;
private String localIp;
private String uuid;
+ private String clusterTag;
private String clusterName;
public ManagerFetcher(AgentManager agentManager) {
@@ -85,9 +89,10 @@ public class ManagerFetcher extends AbstractDaemon
implements ProfileFetcher {
if (requiredKeys(conf)) {
httpManager = new HttpManager(conf);
baseManagerUrl = httpManager.getBaseUrl();
- taskConfigUrl = buildTaskConfigUrl(baseManagerUrl);
staticConfigUrl = buildStaticConfigUrl(baseManagerUrl);
+ agentConfigInfoUrl = buildAgentConfigInfoUrl(baseManagerUrl);
uniqId = conf.get(AGENT_UNIQ_ID, DEFAULT_AGENT_UNIQ_ID);
+ clusterTag = conf.get(AGENT_CLUSTER_TAG);
clusterName = conf.get(AGENT_CLUSTER_NAME);
} else {
throw new RuntimeException("init manager error, cannot find
required key");
@@ -101,68 +106,74 @@ public class ManagerFetcher extends AbstractDaemon
implements ProfileFetcher {
/**
* Build task config url for manager according to config
*
- * example -
http://127.0.0.1:8080/inlong/manager/openapi/fileAgent/getTaskConf
+ * example - http://127.0.0.1:8080/inlong/manager/openapi/agent/getTaskConf
*/
- private String buildTaskConfigUrl(String baseUrl) {
- return baseUrl + conf.get(AGENT_MANAGER_TASK_HTTP_PATH,
DEFAULT_AGENT_MANAGER_TASK_HTTP_PATH);
+ private String buildStaticConfigUrl(String baseUrl) {
+ return baseUrl + conf.get(AGENT_MANAGER_TASK_HTTP_PATH,
DEFAULT_AGENT_MANAGER_EXIST_TASK_HTTP_PATH);
}
/**
- * Build task config url for manager according to config
+ * Build agent config info url for manager according to config
*
- * example -
http://127.0.0.1:8080/inlong/manager/openapi/fileAgent/getTaskConf
+ * example - http://127.0.0.1:8080/inlong/manager/openapi/agent/getConfig
*/
- private String buildStaticConfigUrl(String baseUrl) {
- return baseUrl + conf.get(AGENT_MANAGER_TASK_HTTP_PATH,
DEFAULT_AGENT_MANAGER_CONFIG_HTTP_PATH);
+ private String buildAgentConfigInfoUrl(String baseUrl) {
+ return baseUrl + conf.get(AGENT_MANAGER_CONFIG_HTTP_PATH,
DEFAULT_AGENT_MANAGER_CONFIG_HTTP_PATH);
}
/**
- * Request manager to get commands, make sure it is not throwing exceptions
+ * Request manager to get task config, make sure it is not throwing
exceptions
*/
- public TaskResult fetchTaskConfig() {
- LOGGER.info("fetchTaskConfig start");
- String resultStr = httpManager.doSentPost(taskConfigUrl,
getFetchRequest(null));
+ public TaskResult getStaticConfig() {
+ LOGGER.info("Get static config start");
+ String resultStr = httpManager.doSentPost(staticConfigUrl,
getTaskRequest());
+ LOGGER.info("Url to get static config staticConfigUrl {}",
staticConfigUrl);
JsonObject resultData = getResultData(resultStr);
JsonElement element = resultData.get(AGENT_MANAGER_RETURN_PARAM_DATA);
- LOGGER.info("fetchTaskConfig end");
+ LOGGER.info("Get static config end");
if (element != null) {
- LOGGER.info("fetchTaskConfig not null {}", resultData);
+ LOGGER.info("Get static config not null {}", resultData);
return GSON.fromJson(element.getAsJsonObject(), TaskResult.class);
} else {
- LOGGER.info("fetchTaskConfig nothing to do");
+ LOGGER.info("Get static config nothing to do");
return null;
}
}
/**
- * Request manager to get commands, make sure it is not throwing exceptions
+ * Request manager to get config, make sure it is not throwing exceptions
*/
- public TaskResult getStaticConfig() {
- LOGGER.info("Get static config start");
- String resultStr = httpManager.doSentPost(staticConfigUrl,
getFetchRequest(null));
- LOGGER.info("Url to get static config staticConfigUrl {}",
staticConfigUrl);
+ public AgentConfigInfo getAgentConfigInfo() {
+ LOGGER.info("Get agent config info");
+ String resultStr = httpManager.doSentPost(agentConfigInfoUrl,
getAgentConfigInfoRequest());
+ LOGGER.info("Url to get agent config agentConfigInfoUrl {}",
agentConfigInfoUrl);
JsonObject resultData = getResultData(resultStr);
JsonElement element = resultData.get(AGENT_MANAGER_RETURN_PARAM_DATA);
- LOGGER.info("Get static config end");
+ LOGGER.info("Get agent config end");
if (element != null) {
- LOGGER.info("Get static config not null {}", resultData);
- return GSON.fromJson(element.getAsJsonObject(), TaskResult.class);
+ LOGGER.info("Get agent config not null {}", resultData);
+ return GSON.fromJson(element.getAsJsonObject(),
AgentConfigInfo.class);
} else {
- LOGGER.info("Get static config nothing to do");
+ LOGGER.info("Get agent config nothing to do");
return null;
}
}
- /**
- * Form file command fetch request
- */
- public TaskRequest getFetchRequest(List<CommandEntity> unackedCommands) {
+ public TaskRequest getTaskRequest() {
TaskRequest request = new TaskRequest();
request.setAgentIp(localIp);
request.setUuid(uuid);
request.setClusterName(clusterName);
request.setPullJobType(PullJobTypeEnum.NEW.getType());
- request.setCommandInfo(unackedCommands);
+ request.setCommandInfo(null);
+ return request;
+ }
+
+ public AgentConfigRequest getAgentConfigInfoRequest() {
+ AgentConfigRequest request = new AgentConfigRequest();
+ request.setClusterTag(clusterTag);
+ request.setClusterName(clusterName);
+ request.setIp(localIp);
return request;
}
@@ -171,7 +182,7 @@ public class ManagerFetcher extends AbstractDaemon
implements ProfileFetcher {
*
* @return runnable profile.
*/
- private Runnable taskConfigFetchThread() {
+ private Runnable configFetchThread() {
return () -> {
Thread.currentThread().setName("ManagerFetcher");
while (isRunnable()) {
@@ -185,6 +196,10 @@ public class ManagerFetcher extends AbstractDaemon
implements ProfileFetcher {
});
agentManager.getTaskManager().submitTaskProfiles(taskProfiles);
}
+ AgentConfigInfo config = getAgentConfigInfo();
+ if (config != null) {
+ agentManager.subNewAgentConfigInfo(config);
+ }
} catch (Throwable ex) {
LOGGER.warn("exception caught", ex);
ThreadUtils.threadThrowableHandler(Thread.currentThread(),
ex);
@@ -243,7 +258,7 @@ public class ManagerFetcher extends AbstractDaemon
implements ProfileFetcher {
// when agent start, check local ip and fetch manager ip list;
localIp = fetchLocalIp();
uuid = fetchLocalUuid();
- submitWorker(taskConfigFetchThread());
+ submitWorker(configFetchThread());
}
@Override
diff --git
a/inlong-common/src/main/java/org/apache/inlong/common/pojo/agent/AgentConfigRequest.java
b/inlong-common/src/main/java/org/apache/inlong/common/pojo/agent/AgentConfigRequest.java
index f81bd3a281..857583bc86 100644
---
a/inlong-common/src/main/java/org/apache/inlong/common/pojo/agent/AgentConfigRequest.java
+++
b/inlong-common/src/main/java/org/apache/inlong/common/pojo/agent/AgentConfigRequest.java
@@ -33,4 +33,5 @@ public class AgentConfigRequest {
private String clusterName;
+ private String ip;
}