This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 88d3958e12 [INLONG-9858][Agent] Increase local read and write
capabilities for module config (#9892)
88d3958e12 is described below
commit 88d3958e127e6d5bcccfabdfb6ccc1f91b7e96c2
Author: justinwwhuang <[email protected]>
AuthorDate: Fri Mar 29 11:36:51 2024 +0800
[INLONG-9858][Agent] Increase local read and write capabilities for module
config (#9892)
---
.../inlong/agent/installer/ModuleManager.java | 106 +++++++++++++++++----
1 file changed, 85 insertions(+), 21 deletions(-)
diff --git
a/inlong-agent/agent-installer/src/main/java/org/apache/inlong/agent/installer/ModuleManager.java
b/inlong-agent/agent-installer/src/main/java/org/apache/inlong/agent/installer/ModuleManager.java
index 303d9bb0f3..dd33a2ed70 100755
---
a/inlong-agent/agent-installer/src/main/java/org/apache/inlong/agent/installer/ModuleManager.java
+++
b/inlong-agent/agent-installer/src/main/java/org/apache/inlong/agent/installer/ModuleManager.java
@@ -18,6 +18,7 @@
package org.apache.inlong.agent.installer;
import org.apache.inlong.agent.common.AbstractDaemon;
+import org.apache.inlong.agent.constant.AgentConstants;
import org.apache.inlong.agent.installer.conf.InstallerConfiguration;
import org.apache.inlong.agent.metrics.audit.AuditUtils;
import org.apache.inlong.agent.utils.AgentUtils;
@@ -28,12 +29,27 @@ import
org.apache.inlong.common.pojo.agent.installer.ModuleConfig;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
+import com.google.gson.JsonElement;
+import com.google.gson.JsonParser;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.io.BufferedWriter;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileNotFoundException;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.io.OutputStreamWriter;
+import java.io.Reader;
+import java.nio.charset.StandardCharsets;
import java.util.List;
+import java.util.Map;
import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.LinkedBlockingQueue;
+import java.util.stream.Collectors;
import static
org.apache.inlong.agent.constant.FetcherConstants.AGENT_MANAGER_REQUEST_TIMEOUT;
import static
org.apache.inlong.agent.constant.FetcherConstants.AGENT_MANAGER_VIP_HTTP_PREFIX_PATH;
@@ -50,27 +66,24 @@ public class ModuleManager extends AbstractDaemon {
public static final String MANAGER_AUTH_SECRET_ID =
"manager.auth.secretId";
public static final String MANAGER_AUTH_SECRET_KEY =
"manager.auth.secretKey";
public static final int CONFIG_QUEUE_CAPACITY = 1;
- public static final int CORE_THREAD_SLEEP_TIME = 1000;
+ public static final int CORE_THREAD_SLEEP_TIME = 10000;
+ public static final int DOWNLOAD_PACKAGE_READ_BUFF_SIZE = 1024 * 1024;
+ public static final String LOCAL_CONFIG_FILE = "modules.json";
private static final Logger LOGGER =
LoggerFactory.getLogger(ModuleManager.class);
private final InstallerConfiguration conf;
+ private final String confPath;
private final BlockingQueue<ConfigResult> configQueue;
private String currentMd5 = "";
- private String currentStoragePath = "/tmp";
- private ClassLoader classLoader;
+ private Map<Integer, ModuleConfig> currentModules = new
ConcurrentHashMap<>();
private static final GsonBuilder gsonBuilder = new
GsonBuilder().setDateFormat("yyyy-MM-dd HH:mm:ss");
private static final Gson GSON = gsonBuilder.create();
- private final HttpManager httpManager;
+ private HttpManager httpManager;
public ModuleManager() {
conf = InstallerConfiguration.getInstallerConf();
+ confPath = conf.get(AgentConstants.AGENT_HOME,
AgentConstants.DEFAULT_AGENT_HOME) + "/conf/";
configQueue = new LinkedBlockingQueue<>(CONFIG_QUEUE_CAPACITY);
- classLoader = Thread.currentThread().getContextClassLoader();
- if (classLoader == null) {
- classLoader = ModuleManager.class.getClassLoader();
- }
- if (requiredKeys(conf)) {
- httpManager = getHttpManager(conf);
- } else {
+ if (!requiredKeys(conf)) {
throw new RuntimeException("init module manager error, cannot find
required key");
}
}
@@ -97,7 +110,7 @@ public class ModuleManager extends AbstractDaemon {
configQueue.clear();
for (int i = 0; i < config.getModuleList().size(); i++) {
LOGGER.info("submitModules index {} total {} {}", i,
config.getModuleList().size(),
- config.getModuleList().get(i));
+ GSON.toJson(config.getModuleList().get(i)));
}
configQueue.add(config);
}
@@ -106,51 +119,103 @@ public class ModuleManager extends AbstractDaemon {
return currentMd5;
}
+ public ModuleConfig getModule(Integer moduleId) {
+ return currentModules.get(moduleId);
+ }
+
/**
- * thread for core thread.
+ * Thread for core thread.
*
* @return runnable profile.
*/
private Runnable coreThread() {
return () -> {
- Thread.currentThread().setName("task-manager-core");
+ Thread.currentThread().setName("module-manager-core");
+ restoreFromLocalFile(confPath);
while (isRunnable()) {
try {
- AgentUtils.silenceSleepInMs(CORE_THREAD_SLEEP_TIME);
dealWithConfigQueue(configQueue);
AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_TASK_MGR_HEARTBEAT, "", "",
AgentUtils.getCurrentTime(), 1, 1);
+ AgentUtils.silenceSleepInMs(CORE_THREAD_SLEEP_TIME);
} catch (Throwable ex) {
LOGGER.error("exception caught", ex);
ThreadUtils.threadThrowableHandler(Thread.currentThread(),
ex);
+ AgentUtils.silenceSleepInMs(CORE_THREAD_SLEEP_TIME);
}
}
};
}
+ public void restoreFromLocalFile(String confPath) {
+ LOGGER.info("restore modules from local file");
+ String localModuleConfigPath = confPath + LOCAL_CONFIG_FILE;
+ try (Reader reader = new InputStreamReader(
+ new FileInputStream(localModuleConfigPath),
StandardCharsets.UTF_8)) {
+ JsonElement tmpElement =
JsonParser.parseReader(reader).getAsJsonObject();
+ ConfigResult curConfig =
GSON.fromJson(tmpElement.getAsJsonObject(), ConfigResult.class);
+ if (curConfig.getMd5() != null && curConfig.getModuleList() !=
null) {
+ currentMd5 = curConfig.getMd5();
+ curConfig.getModuleList().forEach((module) -> {
+ currentModules.put(module.getId(), module);
+ });
+ } else {
+ LOGGER.info("modules in local file invalid");
+ }
+ } catch (FileNotFoundException e) {
+ LOGGER.info("local module json file {} not found",
localModuleConfigPath);
+ } catch (Exception ioe) {
+ LOGGER.error("error restoredFromLocalFile {}",
localModuleConfigPath, ioe);
+ }
+ }
+
+ public void saveToLocalFile(String confPath) {
+ File temp = new File(confPath);
+ if (!temp.exists()) {
+ temp.mkdirs();
+ }
+ File jsonPath = new File(temp.getPath() + "/" + LOCAL_CONFIG_FILE);
+ try (BufferedWriter writer = new BufferedWriter(
+ new OutputStreamWriter(new FileOutputStream(jsonPath),
StandardCharsets.UTF_8))) {
+ String curConfig =
GSON.toJson(ConfigResult.builder().md5(currentMd5).moduleNum(currentModules.size())
+
.moduleList(currentModules.values().stream().collect(Collectors.toList())).build());
+ writer.write(curConfig);
+ writer.flush();
+ LOGGER.info("save modules to json file");
+ } catch (IOException e) {
+ LOGGER.error("saveToLocalFile error: ", e);
+ }
+ }
+
private void dealWithConfigQueue(BlockingQueue<ConfigResult> queue) {
ConfigResult config = queue.poll();
if (config == null) {
return;
}
- LOGGER.info("Deal with config {}", config);
- if (currentMd5.compareTo(config.getMd5()) == 0) {
+ LOGGER.info("deal with config {}", GSON.toJson(config));
+ if (currentMd5.equals(config.getMd5())) {
LOGGER.info("md5 no change {}, skip update", currentMd5);
return;
}
if (updateModules(config.getModuleList())) {
currentMd5 = config.getMd5();
+ saveToLocalFile(confPath);
} else {
- LOGGER.error("Update modules failed!");
+ LOGGER.error("update modules failed!");
}
}
- private boolean updateModules(List<ModuleConfig> modules) {
+ private boolean updateModules(List<ModuleConfig> managerModuleList) {
+ Map<Integer, ModuleConfig> modulesFromManager = new
ConcurrentHashMap<>();
+ managerModuleList.forEach((moduleConfig) -> {
+ modulesFromManager.put(moduleConfig.getId(), moduleConfig);
+ });
return true;
}
@Override
public void start() throws Exception {
+ httpManager = getHttpManager(conf);
submitWorker(coreThread());
}
@@ -166,7 +231,6 @@ public class ModuleManager extends AbstractDaemon {
*/
@Override
public void stop() throws Exception {
-
- LOGGER.info("stopping installer manager");
+ waitForTerminate();
}
}