This is an automated email from the ASF dual-hosted git repository.

dingtao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozhera.git


The following commit(s) were added to refs/heads/master by this push:
     new c1fe53af feat: Implement Nacos configuration center to support dynamic 
configuration management (#626)
c1fe53af is described below

commit c1fe53afe41a9f2664bdc4b027072f28f6910009
Author: wtt <[email protected]>
AuthorDate: Wed Dec 24 14:39:40 2025 +0800

    feat: Implement Nacos configuration center to support dynamic configuration 
management (#626)
    
    * fix: solve the blocking problem caused by serverless startup
    
    * fix: fix pipeline execution logic error
    
    * feat: support dynamic configuration management
    
    * chore(license): Add Apache license header to configuration class
    
    * chore(build): update
---
 ozhera-log/log-agent-server/pom.xml                |  6 +-
 .../porcessor/AgentCollectProgressProcessor.java   | 73 +++++++++++++----
 .../service/DefaultPublishConfigService.java       | 58 +++++++++-----
 ozhera-log/log-agent/pom.xml                       |  2 +-
 .../log/agent/bootstrap/MiLogAgentBootstrap.java   | 25 ++++--
 .../log/agent/channel/ChannelServiceFactory.java   |  3 +-
 .../log/agent/config/AgentConfigManager.java       | 91 ++++++++++++++++++++++
 .../ConfigCenter.java}                             | 30 +++----
 .../ConfigChangeListener.java}                     | 23 ++----
 .../log/agent/config/nacos/NacosConfigCenter.java  | 69 ++++++++++++++++
 .../log/agent/extension/nacos/NacosConfigUtil.java |  4 +
 .../log-agent/src/main/resources/config.properties |  1 +
 12 files changed, 305 insertions(+), 80 deletions(-)

diff --git a/ozhera-log/log-agent-server/pom.xml 
b/ozhera-log/log-agent-server/pom.xml
index 5c1f9796..531ac649 100644
--- a/ozhera-log/log-agent-server/pom.xml
+++ b/ozhera-log/log-agent-server/pom.xml
@@ -66,7 +66,7 @@ http://www.apache.org/licenses/LICENSE-2.0
         <dependency>
             <groupId>run.mone</groupId>
             <artifactId>rpc</artifactId>
-            <version>1.6.4-jdk21-SNAPSHOT</version>
+            <version>1.6.5-jdk21-SNAPSHOT</version>
             <exclusions>
                 <exclusion>
                     <groupId>ch.qos.logback</groupId>
@@ -114,6 +114,10 @@ http://www.apache.org/licenses/LICENSE-2.0
                     <groupId>org.slf4j</groupId>
                     <artifactId>slf4j-api</artifactId>
                 </exclusion>
+                <exclusion>
+                    <groupId>run.mone</groupId>
+                    <artifactId>rpc</artifactId>
+                </exclusion>
             </exclusions>
         </dependency>
 
diff --git 
a/ozhera-log/log-agent-server/src/main/java/org/apache/ozhera/log/server/porcessor/AgentCollectProgressProcessor.java
 
b/ozhera-log/log-agent-server/src/main/java/org/apache/ozhera/log/server/porcessor/AgentCollectProgressProcessor.java
index a1e33526..b9c91f5f 100644
--- 
a/ozhera-log/log-agent-server/src/main/java/org/apache/ozhera/log/server/porcessor/AgentCollectProgressProcessor.java
+++ 
b/ozhera-log/log-agent-server/src/main/java/org/apache/ozhera/log/server/porcessor/AgentCollectProgressProcessor.java
@@ -18,6 +18,8 @@
  */
 package org.apache.ozhera.log.server.porcessor;
 
+import com.google.common.util.concurrent.RateLimiter;
+import com.xiaomi.data.push.rpc.common.CompressionUtil;
 import com.xiaomi.data.push.rpc.netty.NettyRequestProcessor;
 import com.xiaomi.data.push.rpc.protocol.RemotingCommand;
 import com.xiaomi.youpin.docean.Ioc;
@@ -48,30 +50,67 @@ import static org.apache.ozhera.log.common.Constant.GSON;
 public class AgentCollectProgressProcessor implements NettyRequestProcessor {
 
     @Resource
-    DefaultLogProcessCollector processService;
+    private DefaultLogProcessCollector processService;
 
-    private static Version version = new Version();
+    private static final RateLimiter ERROR_LIMITER = RateLimiter.create(2);
+
+    private static final Version VERSION = new Version();
 
     @Override
     public RemotingCommand processRequest(ChannelHandlerContext ctx, 
RemotingCommand request) throws Exception {
-        log.debug("received a message from the agent");
+        log.debug("Received a message from the agent, remote address: {}", 
getIp(ctx));
+
         RemotingCommand response = 
RemotingCommand.createResponseCommand(Constant.RPCCMD_AGENT_CODE);
-        String body = new String(request.getBody(), StandardCharsets.UTF_8);
+        response.setBody((VERSION + Constant.SUCCESS_MESSAGE).getBytes());
+
+        if (request.getBody() == null || request.getBody().length == 0) {
+            return response;
+        }
+
+        if (processService == null && 
Ioc.ins().containsBean(DefaultLogProcessCollector.class.getCanonicalName())) {
+            processService = 
Ioc.ins().getBean(DefaultLogProcessCollector.class);
+        }
+
+        UpdateLogProcessCmd cmd = parseRequestBody(request.getBody(), ctx);
+        if (cmd == null) {
+            return response;
+        }
+
+        if (processService != null) {
+            processService.collectLogProcess(cmd);
+        }
+
+        return response;
+    }
+
+    /**
+     * try to parse the request body
+     */
+    private UpdateLogProcessCmd parseRequestBody(byte[] bodyBytes, 
ChannelHandlerContext ctx) {
+        String bodyStr = null;
+
         try {
-            UpdateLogProcessCmd cmd = GSON.fromJson(body, 
UpdateLogProcessCmd.class);
-            log.debug("a request from the client sent by the agent:{}", 
cmd.getIp());
-            if (null == processService && 
Ioc.ins().containsBean(DefaultLogProcessCollector.class.getCanonicalName())) {
-                processService = 
Ioc.ins().getBean(DefaultLogProcessCollector.class);
-            }
-            if (null != processService) {
-                processService.collectLogProcess(cmd);
+            bodyStr = new String(bodyBytes, StandardCharsets.UTF_8);
+            UpdateLogProcessCmd cmd = GSON.fromJson(bodyStr, 
UpdateLogProcessCmd.class);
+            if (StringUtils.isBlank(cmd.getIp())) {
+                log.warn("Invalid agent request, ip={}, body={}", getIp(ctx), 
brief(bodyStr));
+                return null;
             }
+            log.debug("Parsed request from agent: ip={}", cmd.getIp());
+            return cmd;
+        } catch (Exception ignored) {
+        }
+
+        try {
+            bodyStr = new String(CompressionUtil.decompress(bodyBytes), 
StandardCharsets.UTF_8);
+            UpdateLogProcessCmd cmd = GSON.fromJson(bodyStr, 
UpdateLogProcessCmd.class);
+            log.debug("Parsed decompressed request from agent: ip={}", 
cmd.getIp());
+            return cmd;
         } catch (Exception e) {
-            log.error("processRequest error,ip:{},body:{}", getIp(ctx), body, 
e);
+            assert bodyStr != null;
+            log.error("processRequest error, ip={}, body={}", getIp(ctx), 
brief(bodyStr), e);
+            return null;
         }
-        response.setBody(version.toString().getBytes());
-        response.setBody(Constant.SUCCESS_MESSAGE.getBytes());
-        return response;
     }
 
     @Override
@@ -90,4 +129,8 @@ public class AgentCollectProgressProcessor implements 
NettyRequestProcessor {
         }
         return StringUtils.EMPTY;
     }
+
+    private String brief(String body) {
+        return body.length() > 200 ? body.substring(0, 200) + "..." : body;
+    }
 }
diff --git 
a/ozhera-log/log-agent-server/src/main/java/org/apache/ozhera/log/server/service/DefaultPublishConfigService.java
 
b/ozhera-log/log-agent-server/src/main/java/org/apache/ozhera/log/server/service/DefaultPublishConfigService.java
index ff97e622..dfd80ae1 100644
--- 
a/ozhera-log/log-agent-server/src/main/java/org/apache/ozhera/log/server/service/DefaultPublishConfigService.java
+++ 
b/ozhera-log/log-agent-server/src/main/java/org/apache/ozhera/log/server/service/DefaultPublishConfigService.java
@@ -35,9 +35,10 @@ import org.apache.ozhera.log.utils.NetUtil;
 
 import javax.annotation.Resource;
 import java.util.*;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
-import java.util.stream.Collectors;
 
 import static org.apache.ozhera.log.common.Constant.GSON;
 import static org.apache.ozhera.log.common.Constant.SYMBOL_COLON;
@@ -60,6 +61,19 @@ public class DefaultPublishConfigService implements 
PublishConfigService {
 
     private volatile boolean configCompressValue = false;
 
+
+    private static final ExecutorService SEND_CONFIG_EXECUTOR;
+
+    static {
+        SEND_CONFIG_EXECUTOR = Executors.newThreadPerTaskExecutor(
+                Thread.ofVirtual()
+                        .name("send-config-vt-", 0)
+                        .uncaughtExceptionHandler((t, e) ->
+                                log.error("send config uncaught exception", e))
+                        .factory()
+        );
+    }
+
     public void init() {
         String raw = System.getenv(CONFIG_COMPRESS_KEY);
         if (StringUtils.isBlank(raw)) {
@@ -80,17 +94,25 @@ public class DefaultPublishConfigService implements 
PublishConfigService {
      * dubbo interface, the timeout period cannot be too long
      *
      * @param agentIp
-     * @param logCollectMeta
+     * @param meta
      */
     @Override
-    public void sengConfigToAgent(String agentIp, LogCollectMeta 
logCollectMeta) {
+    public void sengConfigToAgent(String agentIp, LogCollectMeta meta) {
+        if (StringUtils.isBlank(agentIp) || meta == null) {
+            return;
+        }
+        doSendConfig(agentIp, meta);
+//        SEND_CONFIG_EXECUTOR.execute(() -> );
+    }
+
+    private void doSendConfig(String agentIp, LogCollectMeta meta) {
         int count = 1;
-        while (count < 4) {
+        while (count < 3) {
             Map<String, AgentChannel> logAgentMap = getAgentChannelMap();
-            String agentCurrentIp = queryCurrentDockerAgentIP(agentIp, 
logAgentMap);
+            String agentCurrentIp = getCorrectDockerAgentIP(agentIp, 
logAgentMap);
             if (logAgentMap.containsKey(agentCurrentIp)) {
-                String sendStr = GSON.toJson(logCollectMeta);
-                if 
(CollectionUtils.isNotEmpty(logCollectMeta.getAppLogMetaList())) {
+                String sendStr = GSON.toJson(meta);
+                if (CollectionUtils.isNotEmpty(meta.getAppLogMetaList())) {
                     RemotingCommand req = 
RemotingCommand.createRequestCommand(LogCmd.LOG_REQ);
                     req.setBody(sendStr.getBytes());
 
@@ -103,17 +125,17 @@ public class DefaultPublishConfigService implements 
PublishConfigService {
                     RemotingCommand res = 
rpcServer.sendMessage(logAgentMap.get(agentCurrentIp), req, 10000);
                     started.stop();
                     String response = new String(res.getBody());
-                    log.info("The configuration is sent 
successfully---->{},duration:{}s,agentIp:{}", response, 
started.elapsed().getSeconds(), agentCurrentIp);
+                    log.info("The configuration is send 
successfully---->{},duration:{}s,agentIp:{}", response, 
started.elapsed().getSeconds(), agentCurrentIp);
                     if (Objects.equals(response, "ok")) {
                         break;
                     }
                 }
             } else {
-                log.info("The current agent IP is not 
connected,ip:{},configuration data:{}", agentIp, GSON.toJson(logCollectMeta));
+                log.info("The current agent IP is not 
connected,ip:{},configuration data:{}", agentIp, GSON.toJson(meta));
             }
-            //Retry policy - Retry 4 times, sleep 500 ms each time
+            //Retry policy - Retry 4 times, sleep 200 ms each time
             try {
-                TimeUnit.MILLISECONDS.sleep(500L);
+                TimeUnit.MILLISECONDS.sleep(200L);
             } catch (final InterruptedException ignored) {
             }
             count++;
@@ -124,12 +146,10 @@ public class DefaultPublishConfigService implements 
PublishConfigService {
     public List<String> getAllAgentList() {
         List<String> remoteAddress = Lists.newArrayList();
         List<String> ipAddress = Lists.newArrayList();
-        AgentContext.ins().map.entrySet().forEach(agentChannelEntry -> {
-                    String key = agentChannelEntry.getKey();
-                    remoteAddress.add(key);
-                    ipAddress.add(StringUtils.substringBefore(key, 
SYMBOL_COLON));
-                }
-        );
+        AgentContext.ins().map.forEach((key, value) -> {
+            remoteAddress.add(key);
+            ipAddress.add(StringUtils.substringBefore(key, SYMBOL_COLON));
+        });
         if (COUNT_INCR.getAndIncrement() % 200 == 0) {
             log.info("The set of remote addresses of the connected agent 
machine is:{}", GSON.toJson(remoteAddress));
         }
@@ -142,13 +162,13 @@ public class DefaultPublishConfigService implements 
PublishConfigService {
         return logAgentMap;
     }
 
-    private String queryCurrentDockerAgentIP(String agentIp, Map<String, 
AgentChannel> logAgentMap) {
+    private String getCorrectDockerAgentIP(String agentIp, Map<String, 
AgentChannel> logAgentMap) {
         if (Objects.equals(agentIp, NetUtil.getLocalIp())) {
             //for Docker handles the agent on the current machine
             final String tempIp = agentIp;
             List<String> ipList = getAgentChannelMap().keySet()
                     .stream().filter(ip -> ip.startsWith("172"))
-                    .collect(Collectors.toList());
+                    .toList();
             Optional<String> optionalS = ipList.stream()
                     .filter(ip -> Objects.equals(logAgentMap.get(ip).getIp(), 
tempIp))
                     .findFirst();
diff --git a/ozhera-log/log-agent/pom.xml b/ozhera-log/log-agent/pom.xml
index d8bb355c..d0362c05 100644
--- a/ozhera-log/log-agent/pom.xml
+++ b/ozhera-log/log-agent/pom.xml
@@ -28,7 +28,7 @@ http://www.apache.org/licenses/LICENSE-2.0
     <modelVersion>4.0.0</modelVersion>
 
     <artifactId>log-agent</artifactId>
-    <version>2.2.15-SNAPSHOT</version>
+    <version>2.2.16-SNAPSHOT</version>
 
     <properties>
         <maven.compiler.source>21</maven.compiler.source>
diff --git 
a/ozhera-log/log-agent/src/main/java/org/apache/ozhera/log/agent/bootstrap/MiLogAgentBootstrap.java
 
b/ozhera-log/log-agent/src/main/java/org/apache/ozhera/log/agent/bootstrap/MiLogAgentBootstrap.java
index d68c096b..b38e35c8 100644
--- 
a/ozhera-log/log-agent/src/main/java/org/apache/ozhera/log/agent/bootstrap/MiLogAgentBootstrap.java
+++ 
b/ozhera-log/log-agent/src/main/java/org/apache/ozhera/log/agent/bootstrap/MiLogAgentBootstrap.java
@@ -22,15 +22,16 @@ import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import com.xiaomi.data.push.bo.ClientInfo;
 import com.xiaomi.data.push.rpc.RpcClient;
+import com.xiaomi.youpin.docean.Aop;
+import com.xiaomi.youpin.docean.Ioc;
+import lombok.extern.slf4j.Slf4j;
 import org.apache.ozhera.log.agent.common.Version;
+import org.apache.ozhera.log.agent.config.AgentConfigManager;
+import org.apache.ozhera.log.agent.config.ConfigCenter;
+import org.apache.ozhera.log.agent.config.nacos.NacosConfigCenter;
 import org.apache.ozhera.log.agent.rpc.task.PingTask;
 import org.apache.ozhera.log.common.Config;
 import org.apache.ozhera.log.utils.NetUtil;
-import com.xiaomi.youpin.docean.Aop;
-import com.xiaomi.youpin.docean.Ioc;
-import lombok.extern.slf4j.Slf4j;
-
-import java.io.IOException;
 
 import static org.apache.ozhera.log.utils.ConfigUtils.getConfigValue;
 import static org.apache.ozhera.log.utils.ConfigUtils.getDataHashKey;
@@ -42,7 +43,7 @@ import static 
org.apache.ozhera.log.utils.ConfigUtils.getDataHashKey;
 @Slf4j
 public class MiLogAgentBootstrap {
 
-    public static void main(String[] args) throws IOException {
+    public static void main(String[] args) throws Exception {
         String nacosAddr = getConfigValue("nacosAddr");
         String serviceName = getConfigValue("serviceName");
         log.info("nacosAddr:{},serviceName:{},version:{}", nacosAddr, 
serviceName, new Version());
@@ -63,10 +64,22 @@ public class MiLogAgentBootstrap {
         client.waitStarted();
         log.info("create rpc client finish");
         Aop.ins().init(Maps.newLinkedHashMap());
+        bootstrapAgentConfig(Ioc.ins());
         Ioc.ins().putBean(client).init("org.apache.ozhera.log.agent", 
"com.xiaomi.youpin.docean");
         //Because the client life cycle is advanced, the processor needs to be 
re-registered here
         client.registerProcessor();
         System.in.read();
     }
 
+    private static void bootstrapAgentConfig(Ioc ioc) throws Exception {
+        ConfigCenter agentConfigCenter =
+                new NacosConfigCenter(Config.ins().get("config.address", ""));
+
+        AgentConfigManager agentConfigManager =
+                new AgentConfigManager(agentConfigCenter);
+
+        ioc.putBean(agentConfigManager);
+    }
+
+
 }
diff --git 
a/ozhera-log/log-agent/src/main/java/org/apache/ozhera/log/agent/channel/ChannelServiceFactory.java
 
b/ozhera-log/log-agent/src/main/java/org/apache/ozhera/log/agent/channel/ChannelServiceFactory.java
index ab9ac759..fafac094 100644
--- 
a/ozhera-log/log-agent/src/main/java/org/apache/ozhera/log/agent/channel/ChannelServiceFactory.java
+++ 
b/ozhera-log/log-agent/src/main/java/org/apache/ozhera/log/agent/channel/ChannelServiceFactory.java
@@ -24,6 +24,7 @@ import org.apache.commons.collections.CollectionUtils;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.ozhera.log.agent.channel.memory.AgentMemoryService;
 import org.apache.ozhera.log.agent.channel.pipeline.Pipeline;
+import org.apache.ozhera.log.agent.config.AgentConfigManager;
 import org.apache.ozhera.log.agent.export.MsgExporter;
 import org.apache.ozhera.log.agent.filter.FilterChain;
 import org.apache.ozhera.log.agent.input.Input;
@@ -57,7 +58,7 @@ public class ChannelServiceFactory {
     public ChannelServiceFactory(AgentMemoryService agentMemoryService, String 
memoryBasePath) {
         this.agentMemoryService = agentMemoryService;
         this.memoryBasePath = memoryBasePath;
-        String specialFileSuffix = 
getConfigValue(DEFAULT_SPECIAL_FILE_SUFFIX_KEY);
+        String specialFileSuffix = 
AgentConfigManager.get(DEFAULT_SPECIAL_FILE_SUFFIX_KEY, 
getConfigValue(DEFAULT_SPECIAL_FILE_SUFFIX_KEY));
         if (StringUtils.isNotBlank(specialFileSuffix)) {
             multiSpecialFileSuffix = 
Lists.newArrayList(specialFileSuffix.split(SYMBOL_COMMA));
         }
diff --git 
a/ozhera-log/log-agent/src/main/java/org/apache/ozhera/log/agent/config/AgentConfigManager.java
 
b/ozhera-log/log-agent/src/main/java/org/apache/ozhera/log/agent/config/AgentConfigManager.java
new file mode 100644
index 00000000..05af14e4
--- /dev/null
+++ 
b/ozhera-log/log-agent/src/main/java/org/apache/ozhera/log/agent/config/AgentConfigManager.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.ozhera.log.agent.config;
+
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import lombok.extern.slf4j.Slf4j;
+
+import java.util.Properties;
+
+/**
+ * @author wtt
+ * @date 2025/12/23 14:13
+ * @version 1.0
+ */
+@Slf4j
+public class AgentConfigManager {
+
+    private static final String AGENT_CONFIG_DATA_ID = 
"org.apache.ozhera.log.agent.config";
+
+    private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+
+    private static final Properties CACHE = new Properties();
+
+    private final ConfigCenter configCenter;
+
+    public AgentConfigManager(ConfigCenter configCenter) {
+        this.configCenter = configCenter;
+        init();
+    }
+
+    private void init() {
+        try {
+            String configContent = 
configCenter.getConfig(AGENT_CONFIG_DATA_ID);
+            refresh(configContent);
+
+            configCenter.addListener(AGENT_CONFIG_DATA_ID, this::refresh);
+        } catch (Exception e) {
+            log.error("[AgentConfig] init failed", e);
+        }
+    }
+
+    private synchronized void refresh(String configContent) {
+        if (configContent == null || configContent.isEmpty()) {
+            log.warn("[AgentConfig] empty config, skip refresh");
+            return;
+        }
+
+        try {
+            Properties newConfig = OBJECT_MAPPER.readValue(
+                    configContent,
+                    new TypeReference<Properties>() {
+                    }
+            );
+
+            CACHE.clear();
+            CACHE.putAll(newConfig);
+
+            log.info("[AgentConfig] config refreshed, size={}, content={}",
+                    CACHE.size(), newConfig);
+
+        } catch (Exception e) {
+            log.error("[AgentConfig] refresh cache failed, content={}", 
configContent, e);
+        }
+    }
+
+    public static String get(String key) {
+        return CACHE.getProperty(key);
+    }
+
+    public static String get(String key, String defaultValue) {
+        return CACHE.getProperty(key, defaultValue);
+    }
+
+}
diff --git 
a/ozhera-log/log-agent/src/main/java/org/apache/ozhera/log/agent/extension/nacos/NacosConfigUtil.java
 
b/ozhera-log/log-agent/src/main/java/org/apache/ozhera/log/agent/config/ConfigCenter.java
similarity index 52%
copy from 
ozhera-log/log-agent/src/main/java/org/apache/ozhera/log/agent/extension/nacos/NacosConfigUtil.java
copy to 
ozhera-log/log-agent/src/main/java/org/apache/ozhera/log/agent/config/ConfigCenter.java
index 36220cf2..7f990802 100644
--- 
a/ozhera-log/log-agent/src/main/java/org/apache/ozhera/log/agent/extension/nacos/NacosConfigUtil.java
+++ 
b/ozhera-log/log-agent/src/main/java/org/apache/ozhera/log/agent/config/ConfigCenter.java
@@ -16,29 +16,21 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.ozhera.log.agent.extension.nacos;
-
-import com.alibaba.nacos.api.config.ConfigFactory;
-import com.alibaba.nacos.api.config.ConfigService;
-import com.alibaba.nacos.api.exception.NacosException;
-
-import static org.apache.ozhera.log.common.Constant.DEFAULT_GROUP_ID;
-import static org.apache.ozhera.log.common.Constant.DEFAULT_TIME_OUT_MS;
+package org.apache.ozhera.log.agent.config;
 
 /**
  * @author wtt
+ * @date 2025/12/23 14:59
  * @version 1.0
- * @description
- * @date 2025/6/10 16:52
  */
-public class NacosConfigUtil {
-    private final ConfigService configService;
-
-    public NacosConfigUtil(String nacosAddr) throws NacosException {
-        this.configService = ConfigFactory.createConfigService(nacosAddr);
-    }
+public interface ConfigCenter {
+    /**
+     * get the configuration content (json/yaml/properties are all fine, the 
Agent does not care)
+     */
+    String getConfig(String dataId) throws Exception;
 
-    public String getConfig(String dataId) throws NacosException {
-        return configService.getConfig(dataId, DEFAULT_GROUP_ID, 
DEFAULT_TIME_OUT_MS);
-    }
+    /**
+     * listen for configuration changes
+     */
+    void addListener(String dataId, ConfigChangeListener listener);
 }
diff --git 
a/ozhera-log/log-agent/src/main/java/org/apache/ozhera/log/agent/extension/nacos/NacosConfigUtil.java
 
b/ozhera-log/log-agent/src/main/java/org/apache/ozhera/log/agent/config/ConfigChangeListener.java
similarity index 52%
copy from 
ozhera-log/log-agent/src/main/java/org/apache/ozhera/log/agent/extension/nacos/NacosConfigUtil.java
copy to 
ozhera-log/log-agent/src/main/java/org/apache/ozhera/log/agent/config/ConfigChangeListener.java
index 36220cf2..bb0d2ea0 100644
--- 
a/ozhera-log/log-agent/src/main/java/org/apache/ozhera/log/agent/extension/nacos/NacosConfigUtil.java
+++ 
b/ozhera-log/log-agent/src/main/java/org/apache/ozhera/log/agent/config/ConfigChangeListener.java
@@ -16,29 +16,16 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.ozhera.log.agent.extension.nacos;
-
-import com.alibaba.nacos.api.config.ConfigFactory;
-import com.alibaba.nacos.api.config.ConfigService;
-import com.alibaba.nacos.api.exception.NacosException;
-
-import static org.apache.ozhera.log.common.Constant.DEFAULT_GROUP_ID;
-import static org.apache.ozhera.log.common.Constant.DEFAULT_TIME_OUT_MS;
+package org.apache.ozhera.log.agent.config;
 
 /**
  * @author wtt
+ * @date 2025/12/23 14:59
  * @version 1.0
- * @description
- * @date 2025/6/10 16:52
  */
-public class NacosConfigUtil {
-    private final ConfigService configService;
+@FunctionalInterface
+public interface ConfigChangeListener {
 
-    public NacosConfigUtil(String nacosAddr) throws NacosException {
-        this.configService = ConfigFactory.createConfigService(nacosAddr);
-    }
+    void onChange(String newConfig);
 
-    public String getConfig(String dataId) throws NacosException {
-        return configService.getConfig(dataId, DEFAULT_GROUP_ID, 
DEFAULT_TIME_OUT_MS);
-    }
 }
diff --git 
a/ozhera-log/log-agent/src/main/java/org/apache/ozhera/log/agent/config/nacos/NacosConfigCenter.java
 
b/ozhera-log/log-agent/src/main/java/org/apache/ozhera/log/agent/config/nacos/NacosConfigCenter.java
new file mode 100644
index 00000000..afc60d21
--- /dev/null
+++ 
b/ozhera-log/log-agent/src/main/java/org/apache/ozhera/log/agent/config/nacos/NacosConfigCenter.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.ozhera.log.agent.config.nacos;
+
+import com.alibaba.nacos.api.config.ConfigService;
+import com.alibaba.nacos.api.config.listener.Listener;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.ozhera.log.agent.config.ConfigCenter;
+import org.apache.ozhera.log.agent.config.ConfigChangeListener;
+import org.apache.ozhera.log.agent.extension.nacos.NacosConfigUtil;
+
+import java.util.concurrent.Executor;
+
+import static org.apache.ozhera.log.common.Constant.DEFAULT_GROUP_ID;
+
+/**
+ * @author wtt
+ * @date 2025/12/23 15:03
+ * @version 1.0
+ */
+@Slf4j
+public class NacosConfigCenter implements ConfigCenter {
+
+    private final ConfigService configService;
+
+    public NacosConfigCenter(String serverAddr) throws Exception {
+        this.configService = new 
NacosConfigUtil(serverAddr).getConfigService();
+    }
+
+    @Override
+    public String getConfig(String dataId) throws Exception {
+        return configService.getConfig(dataId, DEFAULT_GROUP_ID, 3000);
+    }
+
+    @Override
+    public void addListener(String dataId, ConfigChangeListener listener) {
+        try {
+            configService.addListener(dataId, DEFAULT_GROUP_ID, new Listener() 
{
+                @Override
+                public Executor getExecutor() {
+                    return null;
+                }
+
+                @Override
+                public void receiveConfigInfo(String configInfo) {
+                    listener.onChange(configInfo);
+                }
+            });
+        } catch (Exception e) {
+            log.error("[NacosConfigCenter] add listener failed", e);
+        }
+    }
+}
diff --git 
a/ozhera-log/log-agent/src/main/java/org/apache/ozhera/log/agent/extension/nacos/NacosConfigUtil.java
 
b/ozhera-log/log-agent/src/main/java/org/apache/ozhera/log/agent/extension/nacos/NacosConfigUtil.java
index 36220cf2..f033e6d4 100644
--- 
a/ozhera-log/log-agent/src/main/java/org/apache/ozhera/log/agent/extension/nacos/NacosConfigUtil.java
+++ 
b/ozhera-log/log-agent/src/main/java/org/apache/ozhera/log/agent/extension/nacos/NacosConfigUtil.java
@@ -21,6 +21,7 @@ package org.apache.ozhera.log.agent.extension.nacos;
 import com.alibaba.nacos.api.config.ConfigFactory;
 import com.alibaba.nacos.api.config.ConfigService;
 import com.alibaba.nacos.api.exception.NacosException;
+import lombok.Getter;
 
 import static org.apache.ozhera.log.common.Constant.DEFAULT_GROUP_ID;
 import static org.apache.ozhera.log.common.Constant.DEFAULT_TIME_OUT_MS;
@@ -31,7 +32,9 @@ import static 
org.apache.ozhera.log.common.Constant.DEFAULT_TIME_OUT_MS;
  * @description
  * @date 2025/6/10 16:52
  */
+@Getter
 public class NacosConfigUtil {
+
     private final ConfigService configService;
 
     public NacosConfigUtil(String nacosAddr) throws NacosException {
@@ -41,4 +44,5 @@ public class NacosConfigUtil {
     public String getConfig(String dataId) throws NacosException {
         return configService.getConfig(dataId, DEFAULT_GROUP_ID, 
DEFAULT_TIME_OUT_MS);
     }
+
 }
diff --git a/ozhera-log/log-agent/src/main/resources/config.properties 
b/ozhera-log/log-agent/src/main/resources/config.properties
index a269eac6..7c9570b3 100644
--- a/ozhera-log/log-agent/src/main/resources/config.properties
+++ b/ozhera-log/log-agent/src/main/resources/config.properties
@@ -17,6 +17,7 @@ app_max_index=30
 
 nacosAddr=${nacosAddr}
 serviceName=${serviceName}
+config.address=${nacosAddr}
 
 log.path=${log.path}
 # agent\u91C7\u96C6\u8FDB\u5EA6\u5B58\u50A8\u8DEF\u5F84


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to