This is an automated email from the ASF dual-hosted git repository.
aloyszhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 8ac3afcf2c [INLONG-10650][Agent] When the installer updates the
configuration, it first checks the version (#10654)
8ac3afcf2c is described below
commit 8ac3afcf2c3be7a961d0e3b5aaa602dfa5e2d787
Author: justinwwhuang <[email protected]>
AuthorDate: Thu Jul 18 16:12:20 2024 +0800
[INLONG-10650][Agent] When the installer updates the configuration, it
first checks the version (#10654)
---
.../inlong/agent/installer/ManagerFetcher.java | 3 +-
.../inlong/agent/installer/ModuleManager.java | 113 +++++++++++++--------
.../src/test/java/installer/TestModuleManager.java | 2 +-
.../src/test/resources/conf/modules.json | 27 ++++-
4 files changed, 97 insertions(+), 48 deletions(-)
diff --git
a/inlong-agent/agent-installer/src/main/java/org/apache/inlong/agent/installer/ManagerFetcher.java
b/inlong-agent/agent-installer/src/main/java/org/apache/inlong/agent/installer/ManagerFetcher.java
index 222d2e9d83..9705d8a7fd 100644
---
a/inlong-agent/agent-installer/src/main/java/org/apache/inlong/agent/installer/ManagerFetcher.java
+++
b/inlong-agent/agent-installer/src/main/java/org/apache/inlong/agent/installer/ManagerFetcher.java
@@ -124,7 +124,8 @@ public class ManagerFetcher extends AbstractDaemon
implements ProfileFetcher {
while (isRunnable()) {
try {
ConfigResult config = getConfig();
- if (config != null &&
config.getCode().equals(AgentResponseCode.SUCCESS)) {
+ if (config != null &&
config.getCode().equals(AgentResponseCode.SUCCESS)
+ && manager.getModuleManager().getCurrentVersion()
< config.getVersion()) {
manager.getModuleManager().submitConfig(config);
}
} catch (Throwable ex) {
diff --git
a/inlong-agent/agent-installer/src/main/java/org/apache/inlong/agent/installer/ModuleManager.java
b/inlong-agent/agent-installer/src/main/java/org/apache/inlong/agent/installer/ModuleManager.java
index c79d31f753..cb92576c3c 100755
---
a/inlong-agent/agent-installer/src/main/java/org/apache/inlong/agent/installer/ModuleManager.java
+++
b/inlong-agent/agent-installer/src/main/java/org/apache/inlong/agent/installer/ModuleManager.java
@@ -84,6 +84,7 @@ public class ModuleManager extends AbstractDaemon {
private final String confPath;
private final BlockingQueue<ConfigResult> configQueue;
private String currentMd5 = "";
+ private Integer currentVersion = -1;
private Map<Integer, ModuleConfig> currentModules = new
ConcurrentHashMap<>();
private static final GsonBuilder gsonBuilder = new
GsonBuilder().setDateFormat("yyyy-MM-dd HH:mm:ss");
private static final Gson GSON = gsonBuilder.create();
@@ -135,6 +136,10 @@ public class ModuleManager extends AbstractDaemon {
LOGGER.error("modules md5 should not be null!");
return false;
}
+ if (config.getVersion() == null) {
+ LOGGER.error("modules version should not be null!");
+ return false;
+ }
if (config.getModuleList().isEmpty()) {
LOGGER.error("module list should not be empty!");
return false;
@@ -218,6 +223,10 @@ public class ModuleManager extends AbstractDaemon {
return currentMd5;
}
+ public Integer getCurrentVersion() {
+ return currentVersion;
+ }
+
public ModuleConfig getModule(Integer moduleId) {
return currentModules.get(moduleId);
}
@@ -254,8 +263,13 @@ public class ModuleManager extends AbstractDaemon {
new FileInputStream(localModuleConfigPath),
StandardCharsets.UTF_8)) {
JsonElement tmpElement =
JsonParser.parseReader(reader).getAsJsonObject();
ConfigResult curConfig =
GSON.fromJson(tmpElement.getAsJsonObject(), ConfigResult.class);
- if (curConfig.getMd5() != null && curConfig.getModuleList() !=
null) {
- currentMd5 = curConfig.getMd5();
+ if (curConfig.getModuleList() != null) {
+ if (curConfig.getMd5() != null) {
+ currentMd5 = curConfig.getMd5();
+ }
+ if (curConfig.getVersion() != null) {
+ currentVersion = curConfig.getVersion();
+ }
curConfig.getModuleList().forEach((module) -> {
currentModules.put(module.getId(), module);
});
@@ -277,7 +291,7 @@ public class ModuleManager extends AbstractDaemon {
File jsonPath = new File(temp.getPath() + "/" + LOCAL_CONFIG_FILE);
try (BufferedWriter writer = new BufferedWriter(
new OutputStreamWriter(new FileOutputStream(jsonPath),
StandardCharsets.UTF_8))) {
- String curConfig =
GSON.toJson(ConfigResult.builder().md5(currentMd5)
+ String curConfig =
GSON.toJson(ConfigResult.builder().md5(currentMd5).version(currentVersion)
.moduleList(currentModules.values().stream().collect(Collectors.toList())).build());
writer.write(curConfig);
writer.flush();
@@ -299,6 +313,7 @@ public class ModuleManager extends AbstractDaemon {
}
if (updateModules(config.getModuleList())) {
currentMd5 = config.getMd5();
+ currentVersion = config.getVersion();
saveToLocalFile(confPath);
} else {
LOGGER.error("update modules failed!");
@@ -308,13 +323,14 @@ public class ModuleManager extends AbstractDaemon {
private void checkModules() {
LOGGER.info("check modules start");
currentModules.values().forEach((module) -> {
- LOGGER.info("check module current state {} {}", module.getName(),
module.getState());
+ LOGGER.info("check module {}({}) current state {}",
module.getId(), module.getName(), module.getState());
switch (module.getState()) {
case NEW:
if (downloadModule(module)) {
saveModuleState(module.getId(),
ModuleStateEnum.DOWNLOADED);
} else {
- LOGGER.error("download module {} failed, keep state in
new", module.getName());
+ LOGGER.error("download module {}({}) failed, keep
state in new", module.getId(),
+ module.getName());
}
break;
case DOWNLOADED:
@@ -323,22 +339,24 @@ public class ModuleManager extends AbstractDaemon {
saveModuleState(module.getId(),
ModuleStateEnum.INSTALLED);
} else {
LOGGER.info(
- "check module {} package failed, change stated
to new, will download package again",
- module.getName());
+ "check module {}({}) package failed, change
stated to new, will download package again",
+ module.getId(), module.getName());
saveModuleState(module.getId(), ModuleStateEnum.NEW);
}
break;
case INSTALLED:
if (!isProcessAllStarted(module)) {
- LOGGER.info("module {} process not all started try to
start", module.getName());
+ LOGGER.info("module {}({}) process not all started try
to start", module.getId(),
+ module.getName());
if (!startModule(module)) {
- LOGGER.info("start module {} failed, change state
to downloaded", module.getState());
+ LOGGER.info("start module {}({}) failed, change
state to downloaded", module.getId(),
+ module.getName());
saveModuleState(module.getId(),
ModuleStateEnum.DOWNLOADED);
}
}
break;
default:
- LOGGER.error("module {} invalid state {}",
module.getName(), module.getState());
+ LOGGER.error("module {}({}) invalid state {}",
module.getId(), module.getName(), module.getState());
}
});
LOGGER.info("check modules end");
@@ -358,15 +376,15 @@ public class ModuleManager extends AbstractDaemon {
modulesFromManager.values().forEach((managerModule) -> {
ModuleConfig localModule =
currentModules.get(managerModule.getId());
if (localModule == null) {
- LOGGER.info("traverseManagerModulesToLocal module {} {} {} not
found in local, add it",
+ LOGGER.info("traverseManagerModulesToLocal module {}({}) {}
not found in local, add it",
managerModule.getId(), managerModule.getName(),
managerModule.getVersion());
addModule(managerModule);
} else {
if (managerModule.getMd5().equals(localModule.getMd5())) {
- LOGGER.info("traverseManagerModulesToLocal module {} {} {}
md5 no change, do nothing",
+ LOGGER.info("traverseManagerModulesToLocal module {}({})
{} md5 no change, do nothing",
localModule.getId(), localModule.getName(),
localModule.getVersion());
} else {
- LOGGER.info("traverseManagerModulesToLocal module {} {} {}
md5 changed, update it",
+ LOGGER.info("traverseManagerModulesToLocal module {}({})
{} md5 changed, update it",
localModule.getId(), localModule.getName(),
localModule.getVersion());
updateModule(localModule, managerModule);
}
@@ -378,7 +396,7 @@ public class ModuleManager extends AbstractDaemon {
currentModules.values().forEach((localModule) -> {
ModuleConfig managerModule =
modulesFromManager.get(localModule.getId());
if (managerModule == null) {
- LOGGER.info("traverseLocalModulesToManager module {} {} {} not
found in local, delete it",
+ LOGGER.info("traverseLocalModulesToManager module {}({}) {}
not found in local, delete it",
localModule.getId(), localModule.getName(),
localModule.getVersion());
deleteModule(localModule);
}
@@ -386,46 +404,49 @@ public class ModuleManager extends AbstractDaemon {
}
private void addModule(ModuleConfig module) {
- LOGGER.info("add module {} start", module.getName());
+ LOGGER.info("add module {}({}) start", module.getId(),
module.getName());
addAndSaveModuleConfig(module);
if (!downloadModule(module)) {
- LOGGER.error("add module {} but download failed",
module.getName());
+ LOGGER.error("add module {}({}) but download failed",
module.getId(), module.getName());
return;
}
saveModuleState(module.getId(), ModuleStateEnum.DOWNLOADED);
installModule(module);
saveModuleState(module.getId(), ModuleStateEnum.INSTALLED);
startModule(module);
- LOGGER.info("add module {} end", module.getId());
+ LOGGER.info("add module {}({}) end", module.getId(), module.getName());
}
private void deleteModule(ModuleConfig module) {
- LOGGER.info("delete module {} start", module.getId());
+ LOGGER.info("delete module {}({}) start", module.getId(),
module.getName());
stopModule(module);
uninstallModule(module);
deleteAndSaveModuleConfig(module);
- LOGGER.info("delete module {} end", module.getId());
+ LOGGER.info("delete module {}({}) end", module.getId(),
module.getName());
}
private void updateModule(ModuleConfig localModule, ModuleConfig
managerModule) {
- LOGGER.info("update module {} start", localModule.getId());
+ LOGGER.info("update module {}({}) start", localModule.getId(),
localModule.getName());
if
(localModule.getPackageConfig().getMd5().equals(managerModule.getPackageConfig().getMd5()))
{
- LOGGER.info("module {} package md5 no change, will restart and
save config", localModule.getId());
+ LOGGER.info("module {}({}) package md5 no change, will restart and
save config", localModule.getId(),
+ localModule.getName());
restartModule(localModule, managerModule);
managerModule.setState(localModule.getState());
updateModuleConfig(managerModule);
} else {
- LOGGER.info("module {} package md5 changed, will reinstall",
localModule.getId());
+ LOGGER.info("module {}({}) package md5 changed, will reinstall",
localModule.getId(),
+ localModule.getName());
deleteModule(localModule);
addModule(managerModule);
}
- LOGGER.info("update module {} end", localModule.getId());
+ LOGGER.info("update module {}({}) end", localModule.getId(),
localModule.getName());
}
private void addAndSaveModuleConfig(ModuleConfig module) {
module.setState(ModuleStateEnum.NEW);
if (currentModules.containsKey(module.getId())) {
- LOGGER.error("should not happen! module {} found! will force to
replace it!", module.getId());
+ LOGGER.error("should not happen! module {}({}) found! will force
to replace it!", module.getId(),
+ module.getName());
}
currentModules.put(module.getId(), module);
saveToLocalFile(confPath);
@@ -433,7 +454,7 @@ public class ModuleManager extends AbstractDaemon {
private void deleteAndSaveModuleConfig(ModuleConfig module) {
if (!currentModules.containsKey(module.getId())) {
- LOGGER.error("should not happen! module {} not found!",
module.getId());
+ LOGGER.error("should not happen! module {}({}) not found!",
module.getId(), module.getName());
return;
}
currentModules.remove(module.getId());
@@ -453,7 +474,7 @@ public class ModuleManager extends AbstractDaemon {
}
module.setState(state);
saveToLocalFile(confPath);
- LOGGER.info("save module state to {} {}", moduleId, state);
+ LOGGER.info("save module {}({}) state to {}", module.getId(),
module.getName(), state);
return true;
}
@@ -463,42 +484,43 @@ public class ModuleManager extends AbstractDaemon {
}
private void installModule(ModuleConfig module) {
- LOGGER.info("install module {} with cmd {}", module.getId(),
module.getInstallCommand());
+ LOGGER.info("install module {}({}) with cmd {}", module.getId(),
module.getName(), module.getInstallCommand());
String ret = ExcuteLinux.exeCmd(module.getInstallCommand());
- LOGGER.info("install module {} return {} ", module.getId(), ret);
+ LOGGER.info("install module {}({}) return {} ", module.getId(),
module.getName(), ret);
}
private boolean startModule(ModuleConfig module) {
- LOGGER.info("start module {} with cmd {}", module.getId(),
module.getStartCommand());
+ LOGGER.info("start module {}({}) with cmd {}", module.getId(),
module.getName(), module.getStartCommand());
for (int i = 0; i < module.getProcessesNum(); i++) {
String ret = ExcuteLinux.exeCmd(module.getStartCommand());
- LOGGER.info("start [{}] module {} return {} ", i, module.getId(),
ret);
+ LOGGER.info("start module {}({}) proc[{}] return {} ",
module.getId(), module.getName(), i, ret);
}
if (isProcessAllStarted(module)) {
- LOGGER.info("start module {} success", module.getId());
+ LOGGER.info("start module {}({}) success", module.getId(),
module.getName());
return true;
} else {
- LOGGER.info("start module {} failed", module.getId());
+ LOGGER.info("start module {}({}) failed", module.getId(),
module.getName());
return false;
}
}
private void stopModule(ModuleConfig module) {
- LOGGER.info("stop module {} with cmd {}", module.getId(),
module.getStopCommand());
+ LOGGER.info("stop module {}({}) with cmd {}", module.getId(),
module.getName(), module.getStopCommand());
String ret = ExcuteLinux.exeCmd(module.getStopCommand());
- LOGGER.info("stop module {} return {} ", module.getId(), ret);
+ LOGGER.info("stop module {}({}) return {} ", module.getId(),
module.getName(), ret);
}
private void uninstallModule(ModuleConfig module) {
- LOGGER.info("uninstall module {} with cmd {}", module.getId(),
module.getUninstallCommand());
+ LOGGER.info("uninstall module {}({}) with cmd {}", module.getId(),
module.getName(),
+ module.getUninstallCommand());
String ret = ExcuteLinux.exeCmd(module.getUninstallCommand());
- LOGGER.info("uninstall module {} return {} ", module.getId(), ret);
+ LOGGER.info("uninstall module {}({}) return {} ", module.getId(),
module.getName(), ret);
}
private boolean isProcessAllStarted(ModuleConfig module) {
String ret = ExcuteLinux.exeCmd(module.getCheckCommand());
if (ret == null) {
- LOGGER.error("get module process num {} failed", module.getName());
+ LOGGER.error("get module {}({}) process num failed",
module.getId(), module.getName());
return false;
}
String[] processArray = ret.split("\n");
@@ -508,12 +530,12 @@ public class ModuleManager extends AbstractDaemon {
cnt++;
}
}
- LOGGER.info("get module process num {} {}", module.getName(), cnt);
+ LOGGER.info("get module {}({}) process num {}", module.getId(),
module.getName(), cnt);
return cnt >= module.getProcessesNum();
}
private boolean downloadModule(ModuleConfig module) {
- LOGGER.info("download module {} begin with url {}", module.getId(),
+ LOGGER.info("download module {}({}) begin with url {}",
module.getId(), module.getName(),
module.getPackageConfig().getDownloadUrl());
try {
URL url = new URL(module.getPackageConfig().getDownloadUrl());
@@ -526,7 +548,7 @@ public class ModuleManager extends AbstractDaemon {
module.getPackageConfig().getStoragePath() + "/" +
module.getPackageConfig().getFileName();
try (InputStream inputStream = conn.getInputStream();
FileOutputStream outputStream = new
FileOutputStream(path)) {
- LOGGER.info("save path {}", path);
+ LOGGER.info("module {}({}) save path {}", module.getId(),
module.getName(), path);
int byteRead;
byte[] buffer = new byte[DOWNLOAD_PACKAGE_READ_BUFF_SIZE];
while ((byteRead = inputStream.read(buffer)) != -1) {
@@ -536,15 +558,15 @@ public class ModuleManager extends AbstractDaemon {
if (isPackageDownloaded(module)) {
return true;
} else {
- LOGGER.error("download package md5 not match!");
+ LOGGER.error("download module {}({}) package md5 not match!",
module.getId(), module.getName());
return false;
}
} catch (FileNotFoundException e) {
- LOGGER.error("download module err", e);
+ LOGGER.error("download module {}({}) err", module.getId(),
module.getName(), e);
} catch (IOException e) {
- LOGGER.error("download module err", e);
+ LOGGER.error("download module {}({}) err", module.getId(),
module.getName(), e);
}
- LOGGER.info("download module {} end", module.getId());
+ LOGGER.info("download module {}({}) end", module.getId(),
module.getName());
return false;
}
@@ -554,7 +576,8 @@ public class ModuleManager extends AbstractDaemon {
if (Objects.equals(fileMd5, module.getPackageConfig().getMd5())) {
return true;
} else {
- LOGGER.error("md5 not match! fileMd5 {} moduleMd5 {}", fileMd5,
module.getPackageConfig().getMd5());
+ LOGGER.error("module {}({}) md5 not match! fileMd5 {} moduleMd5
{}", module.getId(), module.getName(),
+ fileMd5, module.getPackageConfig().getMd5());
return false;
}
}
diff --git
a/inlong-agent/agent-installer/src/test/java/installer/TestModuleManager.java
b/inlong-agent/agent-installer/src/test/java/installer/TestModuleManager.java
index e5ccbd8cd3..ecd2383bf0 100755
---
a/inlong-agent/agent-installer/src/test/java/installer/TestModuleManager.java
+++
b/inlong-agent/agent-installer/src/test/java/installer/TestModuleManager.java
@@ -178,7 +178,7 @@ public class TestModuleManager {
"echo empty uninstall cmd",
"agent-release-1.13.0-SNAPSHOT-bin.tar.gz",
"http://11.151.252.111:8083/inlong/manager/openapi/agent/download/agent-release-1.13.0-SNAPSHOT-bin.tar.gz",
NEW_MD5));
- return
ConfigResult.builder().moduleList(configs).md5("config-result-md5-193603").build();
+ return
ConfigResult.builder().moduleList(configs).md5("config-result-md5-193603").version(1).build();
}
private ModuleConfig getModuleConfig(int id, String name, String md5,
String version, Integer procNum,
diff --git a/inlong-agent/agent-installer/src/test/resources/conf/modules.json
b/inlong-agent/agent-installer/src/test/resources/conf/modules.json
index 1b365c587c..a6190de743 100644
--- a/inlong-agent/agent-installer/src/test/resources/conf/modules.json
+++ b/inlong-agent/agent-installer/src/test/resources/conf/modules.json
@@ -1 +1,26 @@
-{"md5":"b38c63451ff966a32994a867ec79d259","moduleNum":1,"moduleList":[{"id":1,"name":"inlong-agent","md5":"55175a3b2cb143f31ad3d79e081e794c","version":"1.0","processesNum":1,"startCommand":"cd
~/inlong-agent/bin;sh agent.sh start","stopCommand":"cd ~/inlong-agent/bin;sh
agent.sh stop","checkCommand":"ps aux | grep core.AgentMain | grep java | grep
-v grep | awk \u0027{print $2}\u0027","installCommand":"cd
~/inlong-agent/bin;sh agent.sh stop;rm -rf ~/inlong-agent/;mkdir
~/inlong-agent;cd [...]
\ No newline at end of file
+{
+ "md5": "b38c63451ff966a32994a867ec79d259",
+ "version": 1,
+ "moduleNum": 1,
+ "moduleList": [
+ {
+ "id": 1,
+ "name": "inlong-agent",
+ "md5": "55175a3b2cb143f31ad3d79e081e794c",
+ "version": "1.0",
+ "processesNum": 1,
+ "startCommand": "cd ~/inlong-agent/bin;sh agent.sh start",
+ "stopCommand": "cd ~/inlong-agent/bin;sh agent.sh stop",
+ "checkCommand": "ps aux | grep core.AgentMain | grep java | grep -v grep
| awk \u0027{print $2}\u0027",
+ "installCommand": "cd ~/inlong-agent/bin;sh agent.sh stop;rm -rf
~/inlong-agent/;mkdir ~/inlong-agent;cd /tmp;tar -xzvf
apache-inlong-agent-1.13.0-SNAPSHOT-bin.tar.gz -C ~/inlong-agent;cd
~/inlong-agent/bin;sh agent.sh start",
+ "uninstallCommand": "echo empty uninstall cmd",
+ "packageConfig": {
+ "md5": "95648c83b45971dce503d5d844496cfc",
+ "fileName": "apache-inlong-agent-1.13.0-SNAPSHOT-bin.tar.gz",
+ "downloadUrl":
"http://11.151.246.158:8083/inlong/manager/openapi/installer/download?filename\u003dapache-inlong-agent-1.13.0-SNAPSHOT-bin.tar.gz",
+ "storagePath": "/tmp"
+ },
+ "state": "INSTALLED"
+ }
+ ]
+}
\ No newline at end of file