This is an automated email from the ASF dual-hosted git repository.

aloyszhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 8ac3afcf2c [INLONG-10650][Agent] When the installer updates the 
configuration, it first checks the version (#10654)
8ac3afcf2c is described below

commit 8ac3afcf2c3be7a961d0e3b5aaa602dfa5e2d787
Author: justinwwhuang <[email protected]>
AuthorDate: Thu Jul 18 16:12:20 2024 +0800

    [INLONG-10650][Agent] When the installer updates the configuration, it 
first checks the version (#10654)
---
 .../inlong/agent/installer/ManagerFetcher.java     |   3 +-
 .../inlong/agent/installer/ModuleManager.java      | 113 +++++++++++++--------
 .../src/test/java/installer/TestModuleManager.java |   2 +-
 .../src/test/resources/conf/modules.json           |  27 ++++-
 4 files changed, 97 insertions(+), 48 deletions(-)

diff --git 
a/inlong-agent/agent-installer/src/main/java/org/apache/inlong/agent/installer/ManagerFetcher.java
 
b/inlong-agent/agent-installer/src/main/java/org/apache/inlong/agent/installer/ManagerFetcher.java
index 222d2e9d83..9705d8a7fd 100644
--- 
a/inlong-agent/agent-installer/src/main/java/org/apache/inlong/agent/installer/ManagerFetcher.java
+++ 
b/inlong-agent/agent-installer/src/main/java/org/apache/inlong/agent/installer/ManagerFetcher.java
@@ -124,7 +124,8 @@ public class ManagerFetcher extends AbstractDaemon 
implements ProfileFetcher {
             while (isRunnable()) {
                 try {
                     ConfigResult config = getConfig();
-                    if (config != null && 
config.getCode().equals(AgentResponseCode.SUCCESS)) {
+                    if (config != null && 
config.getCode().equals(AgentResponseCode.SUCCESS)
+                            && manager.getModuleManager().getCurrentVersion() 
< config.getVersion()) {
                         manager.getModuleManager().submitConfig(config);
                     }
                 } catch (Throwable ex) {
diff --git 
a/inlong-agent/agent-installer/src/main/java/org/apache/inlong/agent/installer/ModuleManager.java
 
b/inlong-agent/agent-installer/src/main/java/org/apache/inlong/agent/installer/ModuleManager.java
index c79d31f753..cb92576c3c 100755
--- 
a/inlong-agent/agent-installer/src/main/java/org/apache/inlong/agent/installer/ModuleManager.java
+++ 
b/inlong-agent/agent-installer/src/main/java/org/apache/inlong/agent/installer/ModuleManager.java
@@ -84,6 +84,7 @@ public class ModuleManager extends AbstractDaemon {
     private final String confPath;
     private final BlockingQueue<ConfigResult> configQueue;
     private String currentMd5 = "";
+    private Integer currentVersion = -1;
     private Map<Integer, ModuleConfig> currentModules = new 
ConcurrentHashMap<>();
     private static final GsonBuilder gsonBuilder = new 
GsonBuilder().setDateFormat("yyyy-MM-dd HH:mm:ss");
     private static final Gson GSON = gsonBuilder.create();
@@ -135,6 +136,10 @@ public class ModuleManager extends AbstractDaemon {
             LOGGER.error("modules md5 should not be null!");
             return false;
         }
+        if (config.getVersion() == null) {
+            LOGGER.error("modules version should not be null!");
+            return false;
+        }
         if (config.getModuleList().isEmpty()) {
             LOGGER.error("module list should not be empty!");
             return false;
@@ -218,6 +223,10 @@ public class ModuleManager extends AbstractDaemon {
         return currentMd5;
     }
 
+    public Integer getCurrentVersion() {
+        return currentVersion;
+    }
+
     public ModuleConfig getModule(Integer moduleId) {
         return currentModules.get(moduleId);
     }
@@ -254,8 +263,13 @@ public class ModuleManager extends AbstractDaemon {
                 new FileInputStream(localModuleConfigPath), 
StandardCharsets.UTF_8)) {
             JsonElement tmpElement = 
JsonParser.parseReader(reader).getAsJsonObject();
             ConfigResult curConfig = 
GSON.fromJson(tmpElement.getAsJsonObject(), ConfigResult.class);
-            if (curConfig.getMd5() != null && curConfig.getModuleList() != 
null) {
-                currentMd5 = curConfig.getMd5();
+            if (curConfig.getModuleList() != null) {
+                if (curConfig.getMd5() != null) {
+                    currentMd5 = curConfig.getMd5();
+                }
+                if (curConfig.getVersion() != null) {
+                    currentVersion = curConfig.getVersion();
+                }
                 curConfig.getModuleList().forEach((module) -> {
                     currentModules.put(module.getId(), module);
                 });
@@ -277,7 +291,7 @@ public class ModuleManager extends AbstractDaemon {
         File jsonPath = new File(temp.getPath() + "/" + LOCAL_CONFIG_FILE);
         try (BufferedWriter writer = new BufferedWriter(
                 new OutputStreamWriter(new FileOutputStream(jsonPath), 
StandardCharsets.UTF_8))) {
-            String curConfig = 
GSON.toJson(ConfigResult.builder().md5(currentMd5)
+            String curConfig = 
GSON.toJson(ConfigResult.builder().md5(currentMd5).version(currentVersion)
                     
.moduleList(currentModules.values().stream().collect(Collectors.toList())).build());
             writer.write(curConfig);
             writer.flush();
@@ -299,6 +313,7 @@ public class ModuleManager extends AbstractDaemon {
         }
         if (updateModules(config.getModuleList())) {
             currentMd5 = config.getMd5();
+            currentVersion = config.getVersion();
             saveToLocalFile(confPath);
         } else {
             LOGGER.error("update modules failed!");
@@ -308,13 +323,14 @@ public class ModuleManager extends AbstractDaemon {
     private void checkModules() {
         LOGGER.info("check modules start");
         currentModules.values().forEach((module) -> {
-            LOGGER.info("check module current state {} {}", module.getName(), 
module.getState());
+            LOGGER.info("check module {}({}) current state {}", 
module.getId(), module.getName(), module.getState());
             switch (module.getState()) {
                 case NEW:
                     if (downloadModule(module)) {
                         saveModuleState(module.getId(), 
ModuleStateEnum.DOWNLOADED);
                     } else {
-                        LOGGER.error("download module {} failed, keep state in 
new", module.getName());
+                        LOGGER.error("download module {}({}) failed, keep 
state in new", module.getId(),
+                                module.getName());
                     }
                     break;
                 case DOWNLOADED:
@@ -323,22 +339,24 @@ public class ModuleManager extends AbstractDaemon {
                         saveModuleState(module.getId(), 
ModuleStateEnum.INSTALLED);
                     } else {
                         LOGGER.info(
-                                "check module {} package failed, change stated 
to new, will download package again",
-                                module.getName());
+                                "check module {}({}) package failed, change 
stated to new, will download package again",
+                                module.getId(), module.getName());
                         saveModuleState(module.getId(), ModuleStateEnum.NEW);
                     }
                     break;
                 case INSTALLED:
                     if (!isProcessAllStarted(module)) {
-                        LOGGER.info("module {} process not all started try to 
start", module.getName());
+                        LOGGER.info("module {}({}) process not all started try 
to start", module.getId(),
+                                module.getName());
                         if (!startModule(module)) {
-                            LOGGER.info("start module {} failed, change state 
to downloaded", module.getState());
+                            LOGGER.info("start module {}({}) failed, change 
state to downloaded", module.getId(),
+                                    module.getName());
                             saveModuleState(module.getId(), 
ModuleStateEnum.DOWNLOADED);
                         }
                     }
                     break;
                 default:
-                    LOGGER.error("module {} invalid state {}", 
module.getName(), module.getState());
+                    LOGGER.error("module {}({}) invalid state {}", 
module.getId(), module.getName(), module.getState());
             }
         });
         LOGGER.info("check modules end");
@@ -358,15 +376,15 @@ public class ModuleManager extends AbstractDaemon {
         modulesFromManager.values().forEach((managerModule) -> {
             ModuleConfig localModule = 
currentModules.get(managerModule.getId());
             if (localModule == null) {
-                LOGGER.info("traverseManagerModulesToLocal module {} {} {} not 
found in local, add it",
+                LOGGER.info("traverseManagerModulesToLocal module {}({}) {} 
not found in local, add it",
                         managerModule.getId(), managerModule.getName(), 
managerModule.getVersion());
                 addModule(managerModule);
             } else {
                 if (managerModule.getMd5().equals(localModule.getMd5())) {
-                    LOGGER.info("traverseManagerModulesToLocal module {} {} {} 
md5 no change, do nothing",
+                    LOGGER.info("traverseManagerModulesToLocal module {}({}) 
{} md5 no change, do nothing",
                             localModule.getId(), localModule.getName(), 
localModule.getVersion());
                 } else {
-                    LOGGER.info("traverseManagerModulesToLocal module {} {} {} 
md5 changed, update it",
+                    LOGGER.info("traverseManagerModulesToLocal module {}({}) 
{} md5 changed, update it",
                             localModule.getId(), localModule.getName(), 
localModule.getVersion());
                     updateModule(localModule, managerModule);
                 }
@@ -378,7 +396,7 @@ public class ModuleManager extends AbstractDaemon {
         currentModules.values().forEach((localModule) -> {
             ModuleConfig managerModule = 
modulesFromManager.get(localModule.getId());
             if (managerModule == null) {
-                LOGGER.info("traverseLocalModulesToManager module {} {} {} not 
found in local, delete it",
+                LOGGER.info("traverseLocalModulesToManager module {}({}) {} 
not found in local, delete it",
                         localModule.getId(), localModule.getName(), 
localModule.getVersion());
                 deleteModule(localModule);
             }
@@ -386,46 +404,49 @@ public class ModuleManager extends AbstractDaemon {
     }
 
     private void addModule(ModuleConfig module) {
-        LOGGER.info("add module {} start", module.getName());
+        LOGGER.info("add module {}({}) start", module.getId(), 
module.getName());
         addAndSaveModuleConfig(module);
         if (!downloadModule(module)) {
-            LOGGER.error("add module {} but download failed", 
module.getName());
+            LOGGER.error("add module {}({}) but download failed", 
module.getId(), module.getName());
             return;
         }
         saveModuleState(module.getId(), ModuleStateEnum.DOWNLOADED);
         installModule(module);
         saveModuleState(module.getId(), ModuleStateEnum.INSTALLED);
         startModule(module);
-        LOGGER.info("add module {} end", module.getId());
+        LOGGER.info("add module {}({}) end", module.getId(), module.getName());
     }
 
     private void deleteModule(ModuleConfig module) {
-        LOGGER.info("delete module {} start", module.getId());
+        LOGGER.info("delete module {}({}) start", module.getId(), 
module.getName());
         stopModule(module);
         uninstallModule(module);
         deleteAndSaveModuleConfig(module);
-        LOGGER.info("delete module {} end", module.getId());
+        LOGGER.info("delete module {}({}) end", module.getId(), 
module.getName());
     }
 
     private void updateModule(ModuleConfig localModule, ModuleConfig 
managerModule) {
-        LOGGER.info("update module {} start", localModule.getId());
+        LOGGER.info("update module {}({}) start", localModule.getId(), 
localModule.getName());
         if 
(localModule.getPackageConfig().getMd5().equals(managerModule.getPackageConfig().getMd5()))
 {
-            LOGGER.info("module {} package md5 no change, will restart and 
save config", localModule.getId());
+            LOGGER.info("module {}({}) package md5 no change, will restart and 
save config", localModule.getId(),
+                    localModule.getName());
             restartModule(localModule, managerModule);
             managerModule.setState(localModule.getState());
             updateModuleConfig(managerModule);
         } else {
-            LOGGER.info("module {} package md5 changed, will reinstall", 
localModule.getId());
+            LOGGER.info("module {}({}) package md5 changed, will reinstall", 
localModule.getId(),
+                    localModule.getName());
             deleteModule(localModule);
             addModule(managerModule);
         }
-        LOGGER.info("update module {} end", localModule.getId());
+        LOGGER.info("update module {}({}) end", localModule.getId(), 
localModule.getName());
     }
 
     private void addAndSaveModuleConfig(ModuleConfig module) {
         module.setState(ModuleStateEnum.NEW);
         if (currentModules.containsKey(module.getId())) {
-            LOGGER.error("should not happen! module {} found! will force to 
replace it!", module.getId());
+            LOGGER.error("should not happen! module {}({}) found! will force 
to replace it!", module.getId(),
+                    module.getName());
         }
         currentModules.put(module.getId(), module);
         saveToLocalFile(confPath);
@@ -433,7 +454,7 @@ public class ModuleManager extends AbstractDaemon {
 
     private void deleteAndSaveModuleConfig(ModuleConfig module) {
         if (!currentModules.containsKey(module.getId())) {
-            LOGGER.error("should not happen! module {} not found!", 
module.getId());
+            LOGGER.error("should not happen! module {}({}) not found!", 
module.getId(), module.getName());
             return;
         }
         currentModules.remove(module.getId());
@@ -453,7 +474,7 @@ public class ModuleManager extends AbstractDaemon {
         }
         module.setState(state);
         saveToLocalFile(confPath);
-        LOGGER.info("save module state to {} {}", moduleId, state);
+        LOGGER.info("save module {}({}) state to {}", module.getId(), 
module.getName(), state);
         return true;
     }
 
@@ -463,42 +484,43 @@ public class ModuleManager extends AbstractDaemon {
     }
 
     private void installModule(ModuleConfig module) {
-        LOGGER.info("install module {} with cmd {}", module.getId(), 
module.getInstallCommand());
+        LOGGER.info("install module {}({}) with cmd {}", module.getId(), 
module.getName(), module.getInstallCommand());
         String ret = ExcuteLinux.exeCmd(module.getInstallCommand());
-        LOGGER.info("install module {} return {} ", module.getId(), ret);
+        LOGGER.info("install module {}({}) return {} ", module.getId(), 
module.getName(), ret);
     }
 
     private boolean startModule(ModuleConfig module) {
-        LOGGER.info("start module {} with cmd {}", module.getId(), 
module.getStartCommand());
+        LOGGER.info("start module {}({}) with cmd {}", module.getId(), 
module.getName(), module.getStartCommand());
         for (int i = 0; i < module.getProcessesNum(); i++) {
             String ret = ExcuteLinux.exeCmd(module.getStartCommand());
-            LOGGER.info("start [{}] module {} return {} ", i, module.getId(), 
ret);
+            LOGGER.info("start module {}({}) proc[{}] return {} ", 
module.getId(), module.getName(), i, ret);
         }
         if (isProcessAllStarted(module)) {
-            LOGGER.info("start module {} success", module.getId());
+            LOGGER.info("start module {}({}) success", module.getId(), 
module.getName());
             return true;
         } else {
-            LOGGER.info("start module {} failed", module.getId());
+            LOGGER.info("start module {}({}) failed", module.getId(), 
module.getName());
             return false;
         }
     }
 
     private void stopModule(ModuleConfig module) {
-        LOGGER.info("stop module {} with cmd {}", module.getId(), 
module.getStopCommand());
+        LOGGER.info("stop module {}({}) with cmd {}", module.getId(), 
module.getName(), module.getStopCommand());
         String ret = ExcuteLinux.exeCmd(module.getStopCommand());
-        LOGGER.info("stop module {} return {} ", module.getId(), ret);
+        LOGGER.info("stop module {}({}) return {} ", module.getId(), 
module.getName(), ret);
     }
 
     private void uninstallModule(ModuleConfig module) {
-        LOGGER.info("uninstall module {} with cmd {}", module.getId(), 
module.getUninstallCommand());
+        LOGGER.info("uninstall module {}({}) with cmd {}", module.getId(), 
module.getName(),
+                module.getUninstallCommand());
         String ret = ExcuteLinux.exeCmd(module.getUninstallCommand());
-        LOGGER.info("uninstall module {} return {} ", module.getId(), ret);
+        LOGGER.info("uninstall module {}({}) return {} ", module.getId(), 
module.getName(), ret);
     }
 
     private boolean isProcessAllStarted(ModuleConfig module) {
         String ret = ExcuteLinux.exeCmd(module.getCheckCommand());
         if (ret == null) {
-            LOGGER.error("get module process num {} failed", module.getName());
+            LOGGER.error("get module {}({}) process num failed", 
module.getId(), module.getName());
             return false;
         }
         String[] processArray = ret.split("\n");
@@ -508,12 +530,12 @@ public class ModuleManager extends AbstractDaemon {
                 cnt++;
             }
         }
-        LOGGER.info("get module process num {} {}", module.getName(), cnt);
+        LOGGER.info("get module {}({}) process num {}", module.getId(), 
module.getName(), cnt);
         return cnt >= module.getProcessesNum();
     }
 
     private boolean downloadModule(ModuleConfig module) {
-        LOGGER.info("download module {} begin with url {}", module.getId(),
+        LOGGER.info("download module {}({}) begin with url {}", 
module.getId(), module.getName(),
                 module.getPackageConfig().getDownloadUrl());
         try {
             URL url = new URL(module.getPackageConfig().getDownloadUrl());
@@ -526,7 +548,7 @@ public class ModuleManager extends AbstractDaemon {
                     module.getPackageConfig().getStoragePath() + "/" + 
module.getPackageConfig().getFileName();
             try (InputStream inputStream = conn.getInputStream();
                     FileOutputStream outputStream = new 
FileOutputStream(path)) {
-                LOGGER.info("save path {}", path);
+                LOGGER.info("module {}({}) save path {}", module.getId(), 
module.getName(), path);
                 int byteRead;
                 byte[] buffer = new byte[DOWNLOAD_PACKAGE_READ_BUFF_SIZE];
                 while ((byteRead = inputStream.read(buffer)) != -1) {
@@ -536,15 +558,15 @@ public class ModuleManager extends AbstractDaemon {
             if (isPackageDownloaded(module)) {
                 return true;
             } else {
-                LOGGER.error("download package md5 not match!");
+                LOGGER.error("download module {}({}) package md5 not match!", 
module.getId(), module.getName());
                 return false;
             }
         } catch (FileNotFoundException e) {
-            LOGGER.error("download module err", e);
+            LOGGER.error("download module {}({}) err", module.getId(), 
module.getName(), e);
         } catch (IOException e) {
-            LOGGER.error("download module err", e);
+            LOGGER.error("download module {}({}) err", module.getId(), 
module.getName(), e);
         }
-        LOGGER.info("download module {} end", module.getId());
+        LOGGER.info("download module {}({}) end", module.getId(), 
module.getName());
         return false;
     }
 
@@ -554,7 +576,8 @@ public class ModuleManager extends AbstractDaemon {
         if (Objects.equals(fileMd5, module.getPackageConfig().getMd5())) {
             return true;
         } else {
-            LOGGER.error("md5 not match! fileMd5 {} moduleMd5 {}", fileMd5, 
module.getPackageConfig().getMd5());
+            LOGGER.error("module {}({}) md5 not match! fileMd5 {} moduleMd5 
{}", module.getId(), module.getName(),
+                    fileMd5, module.getPackageConfig().getMd5());
             return false;
         }
     }
diff --git 
a/inlong-agent/agent-installer/src/test/java/installer/TestModuleManager.java 
b/inlong-agent/agent-installer/src/test/java/installer/TestModuleManager.java
index e5ccbd8cd3..ecd2383bf0 100755
--- 
a/inlong-agent/agent-installer/src/test/java/installer/TestModuleManager.java
+++ 
b/inlong-agent/agent-installer/src/test/java/installer/TestModuleManager.java
@@ -178,7 +178,7 @@ public class TestModuleManager {
                 "echo empty uninstall cmd", 
"agent-release-1.13.0-SNAPSHOT-bin.tar.gz",
                 
"http://11.151.252.111:8083/inlong/manager/openapi/agent/download/agent-release-1.13.0-SNAPSHOT-bin.tar.gz";,
                 NEW_MD5));
-        return 
ConfigResult.builder().moduleList(configs).md5("config-result-md5-193603").build();
+        return 
ConfigResult.builder().moduleList(configs).md5("config-result-md5-193603").version(1).build();
     }
 
     private ModuleConfig getModuleConfig(int id, String name, String md5, 
String version, Integer procNum,
diff --git a/inlong-agent/agent-installer/src/test/resources/conf/modules.json 
b/inlong-agent/agent-installer/src/test/resources/conf/modules.json
index 1b365c587c..a6190de743 100644
--- a/inlong-agent/agent-installer/src/test/resources/conf/modules.json
+++ b/inlong-agent/agent-installer/src/test/resources/conf/modules.json
@@ -1 +1,26 @@
-{"md5":"b38c63451ff966a32994a867ec79d259","moduleNum":1,"moduleList":[{"id":1,"name":"inlong-agent","md5":"55175a3b2cb143f31ad3d79e081e794c","version":"1.0","processesNum":1,"startCommand":"cd
 ~/inlong-agent/bin;sh agent.sh start","stopCommand":"cd ~/inlong-agent/bin;sh 
agent.sh stop","checkCommand":"ps aux | grep core.AgentMain | grep java | grep 
-v grep | awk \u0027{print $2}\u0027","installCommand":"cd 
~/inlong-agent/bin;sh agent.sh stop;rm -rf ~/inlong-agent/;mkdir 
~/inlong-agent;cd  [...]
\ No newline at end of file
+{
+  "md5": "b38c63451ff966a32994a867ec79d259",
+  "version": 1,
+  "moduleNum": 1,
+  "moduleList": [
+    {
+      "id": 1,
+      "name": "inlong-agent",
+      "md5": "55175a3b2cb143f31ad3d79e081e794c",
+      "version": "1.0",
+      "processesNum": 1,
+      "startCommand": "cd ~/inlong-agent/bin;sh agent.sh start",
+      "stopCommand": "cd ~/inlong-agent/bin;sh agent.sh stop",
+      "checkCommand": "ps aux | grep core.AgentMain | grep java | grep -v grep 
| awk \u0027{print $2}\u0027",
+      "installCommand": "cd ~/inlong-agent/bin;sh agent.sh stop;rm -rf 
~/inlong-agent/;mkdir ~/inlong-agent;cd /tmp;tar -xzvf 
apache-inlong-agent-1.13.0-SNAPSHOT-bin.tar.gz -C ~/inlong-agent;cd 
~/inlong-agent/bin;sh agent.sh start",
+      "uninstallCommand": "echo empty uninstall cmd",
+      "packageConfig": {
+        "md5": "95648c83b45971dce503d5d844496cfc",
+        "fileName": "apache-inlong-agent-1.13.0-SNAPSHOT-bin.tar.gz",
+        "downloadUrl": 
"http://11.151.246.158:8083/inlong/manager/openapi/installer/download?filename\u003dapache-inlong-agent-1.13.0-SNAPSHOT-bin.tar.gz";,
+        "storagePath": "/tmp"
+      },
+      "state": "INSTALLED"
+    }
+  ]
+}
\ No newline at end of file

Reply via email to