This is an automated email from the ASF dual-hosted git repository.

dingtao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozhera.git


The following commit(s) were added to refs/heads/master by this push:
     new ac2eb41e feat: optimize log acquisition thread pool and add new 
features (#602)
ac2eb41e is described below

commit ac2eb41e33f998892dbb5f5040c61ec07d440b0c
Author: wtt <[email protected]>
AuthorDate: Tue Aug 19 15:15:11 2025 +0800

    feat: optimize log acquisition thread pool and add new features (#602)
---
 .../log/agent/channel/AbstractChannelService.java  |  8 ++
 .../log/agent/channel/ChannelServiceImpl.java      |  4 +-
 .../agent/channel/WildcardChannelServiceImpl.java  |  3 +-
 .../ozhera/log/agent/common/ExecutorUtil.java      |  8 +-
 .../ozhera/log/manager/dao/MilogLogTailDao.java    |  1 +
 .../ozhera/log/manager/domain/EsCluster.java       | 25 ++++--
 .../service/impl/MilogConfigNacosServiceImpl.java  |  7 +-
 .../impl/MilogMiddlewareConfigServiceImpl.java     | 12 +--
 .../nacos/ManagerLevelFilterConfigListener.java    | 14 ++--
 .../nacos/impl/StreamConfigNacosPublisher.java     | 11 +--
 .../ozhera/log/stream/config/ConfigManager.java    | 92 ++++++++++++----------
 .../log/stream/config/MilogConfigListener.java     |  2 +-
 .../apache/ozhera/log/stream/job/JobManager.java   | 11 +--
 13 files changed, 118 insertions(+), 80 deletions(-)

diff --git 
a/ozhera-log/log-agent/src/main/java/org/apache/ozhera/log/agent/channel/AbstractChannelService.java
 
b/ozhera-log/log-agent/src/main/java/org/apache/ozhera/log/agent/channel/AbstractChannelService.java
index 009df07f..a82bbdce 100644
--- 
a/ozhera-log/log-agent/src/main/java/org/apache/ozhera/log/agent/channel/AbstractChannelService.java
+++ 
b/ozhera-log/log-agent/src/main/java/org/apache/ozhera/log/agent/channel/AbstractChannelService.java
@@ -25,6 +25,7 @@ import com.xiaomi.mone.file.common.FileUtils;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.ozhera.log.agent.channel.memory.ChannelMemory;
 import org.apache.ozhera.log.agent.common.ChannelUtil;
+import org.apache.ozhera.log.agent.common.ExecutorUtil;
 import org.apache.ozhera.log.agent.input.Input;
 import org.apache.ozhera.log.api.enums.LogTypeEnum;
 import org.apache.ozhera.log.api.model.meta.LogPattern;
@@ -36,6 +37,7 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.UUID;
+import java.util.concurrent.ExecutorService;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.stream.Collectors;
 
@@ -230,4 +232,10 @@ public abstract class AbstractChannelService implements 
ChannelService {
         return false;
     }
 
+    public ExecutorService getExecutorServiceByType(LogTypeEnum logTypeEnum) {
+        if (LogTypeEnum.OPENTELEMETRY == logTypeEnum) {
+            return ExecutorUtil.TELE_TP_EXECUTOR;
+        }
+        return ExecutorUtil.TP_EXECUTOR;
+    }
 }
diff --git 
a/ozhera-log/log-agent/src/main/java/org/apache/ozhera/log/agent/channel/ChannelServiceImpl.java
 
b/ozhera-log/log-agent/src/main/java/org/apache/ozhera/log/agent/channel/ChannelServiceImpl.java
index 7e4ef178..07c5e325 100644
--- 
a/ozhera-log/log-agent/src/main/java/org/apache/ozhera/log/agent/channel/ChannelServiceImpl.java
+++ 
b/ozhera-log/log-agent/src/main/java/org/apache/ozhera/log/agent/channel/ChannelServiceImpl.java
@@ -293,7 +293,7 @@ public class ChannelServiceImpl extends 
AbstractChannelService {
 
     private void opentelemetryMonitor(String configPath) {
         List<String> cleanedPathList = 
ChannelUtil.buildLogExpressList(configPath);
-        monitorFileList.add(MonitorFile.of(configPath, cleanedPathList.get(0), 
logTypeEnum, collectOnce));
+        monitorFileList.add(MonitorFile.of(configPath, 
cleanedPathList.getFirst(), logTypeEnum, collectOnce));
     }
 
     private ReadListener initFileReadListener(MLog mLog, String patternCode, 
String ip, String pattern) {
@@ -393,7 +393,7 @@ public class ChannelServiceImpl extends 
AbstractChannelService {
             stopOldCurrentFileThread(filePath);
             log.info("start to collect file,channelId:{},fileName:{}", 
channelId, filePath);
             logFileMap.put(filePath, logFile);
-            Future<?> future = ExecutorUtil.submit(() -> {
+            Future<?> future = getExecutorServiceByType(logTypeEnum).submit(() 
-> {
                 try {
                     log.info("filePath:{},is VirtualThread {}, 
thread:{},id:{}", filePath, Thread.currentThread().isVirtual(), 
Thread.currentThread(), Thread.currentThread().threadId());
                     logFile.readLine();
diff --git 
a/ozhera-log/log-agent/src/main/java/org/apache/ozhera/log/agent/channel/WildcardChannelServiceImpl.java
 
b/ozhera-log/log-agent/src/main/java/org/apache/ozhera/log/agent/channel/WildcardChannelServiceImpl.java
index 2db94d8f..0d261870 100644
--- 
a/ozhera-log/log-agent/src/main/java/org/apache/ozhera/log/agent/channel/WildcardChannelServiceImpl.java
+++ 
b/ozhera-log/log-agent/src/main/java/org/apache/ozhera/log/agent/channel/WildcardChannelServiceImpl.java
@@ -43,7 +43,6 @@ import org.apache.ozhera.log.agent.input.Input;
 import org.apache.ozhera.log.api.enums.LogTypeEnum;
 import org.apache.ozhera.log.api.model.meta.FilterConf;
 import org.apache.ozhera.log.api.model.msg.LineMessage;
-import org.apache.ozhera.log.common.Config;
 import org.apache.ozhera.log.common.PathUtils;
 
 import java.io.File;
@@ -164,7 +163,7 @@ public class WildcardChannelServiceImpl extends 
AbstractChannelService {
             // Compile the file expression pattern
             Pattern pattern = Pattern.compile(fileExpression);
             for (String monitorPath : monitorPaths) {
-                fileCollFutures.add(ExecutorUtil.submit(() -> 
monitorFileChanges(fileMonitor, monitorPath, pattern)));
+                
fileCollFutures.add(getExecutorServiceByType(getLogTypeEnum()).submit(() -> 
monitorFileChanges(fileMonitor, monitorPath, pattern)));
             }
         } catch (Exception e) {
             log.error("startCollectFile error, channelId: {}, input: {}, ip: 
{}", channelId, GSON.toJson(input), ip, e);
diff --git 
a/ozhera-log/log-agent/src/main/java/org/apache/ozhera/log/agent/common/ExecutorUtil.java
 
b/ozhera-log/log-agent/src/main/java/org/apache/ozhera/log/agent/common/ExecutorUtil.java
index a9e94da3..7d5d0441 100644
--- 
a/ozhera-log/log-agent/src/main/java/org/apache/ozhera/log/agent/common/ExecutorUtil.java
+++ 
b/ozhera-log/log-agent/src/main/java/org/apache/ozhera/log/agent/common/ExecutorUtil.java
@@ -35,7 +35,9 @@ public class ExecutorUtil {
                         log.error("ExecutorUtil-STP-Virtual-Thread 
uncaughtException:{}", e.getMessage(), e);
                     }).factory());
 
-    public static ExecutorService TP_EXECUTOR = createPool();
+    public static ExecutorService TP_EXECUTOR = createPool("TP");
+
+    public static ExecutorService TELE_TP_EXECUTOR = createPool("TELE_TP");
 
     public static ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
                                                          long initialDelay,
@@ -44,9 +46,9 @@ public class ExecutorUtil {
         return STP_EXECUTOR.scheduleAtFixedRate(command, initialDelay, period, 
unit);
     }
 
-    public static ExecutorService createPool() {
+    public static ExecutorService createPool(String name) {
         System.setProperty("jdk.virtualThreadScheduler.parallelism", 
String.valueOf(Runtime.getRuntime().availableProcessors() + 1));
-        ThreadFactory factory = 
Thread.ofVirtual().name("ExecutorUtil-TP-Virtual-Thread", 0)
+        ThreadFactory factory = Thread.ofVirtual().name("ExecutorUtil-" + name 
+ "-Virtual-Thread", 0)
                 .uncaughtExceptionHandler((t, e) -> 
log.error("ExecutorUtil-TP-Virtual-Thread uncaughtException:{}", 
e.getMessage(), e)).factory();
         return Executors.newThreadPerTaskExecutor(factory);
     }
diff --git 
a/ozhera-log/log-manager/src/main/java/org/apache/ozhera/log/manager/dao/MilogLogTailDao.java
 
b/ozhera-log/log-manager/src/main/java/org/apache/ozhera/log/manager/dao/MilogLogTailDao.java
index 4008f223..27c8b078 100644
--- 
a/ozhera-log/log-manager/src/main/java/org/apache/ozhera/log/manager/dao/MilogLogTailDao.java
+++ 
b/ozhera-log/log-manager/src/main/java/org/apache/ozhera/log/manager/dao/MilogLogTailDao.java
@@ -279,6 +279,7 @@ public class MilogLogTailDao {
 
     public List<MilogLogTailDo> getLogTailByLastId(Long lastId, int pageSize) {
         Sql sql = Sqls.queryEntity("SELECT * FROM milog_logstail WHERE id > 
@lastId ORDER BY id LIMIT @pageSize");
+        sql.setEntity(dao.getEntity(MilogLogTailDo.class));
         sql.params().set("lastId", lastId);
         sql.params().set("pageSize", pageSize);
         dao.execute(sql);
diff --git 
a/ozhera-log/log-manager/src/main/java/org/apache/ozhera/log/manager/domain/EsCluster.java
 
b/ozhera-log/log-manager/src/main/java/org/apache/ozhera/log/manager/domain/EsCluster.java
index 84343732..e670b661 100644
--- 
a/ozhera-log/log-manager/src/main/java/org/apache/ozhera/log/manager/domain/EsCluster.java
+++ 
b/ozhera-log/log-manager/src/main/java/org/apache/ozhera/log/manager/domain/EsCluster.java
@@ -18,6 +18,12 @@
  */
 package org.apache.ozhera.log.manager.domain;
 
+import com.xiaomi.youpin.docean.Ioc;
+import com.xiaomi.youpin.docean.anno.Service;
+import com.xiaomi.youpin.docean.common.StringUtils;
+import com.xiaomi.youpin.docean.plugin.es.EsService;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.collections.CollectionUtils;
 import org.apache.ozhera.log.common.Constant;
 import org.apache.ozhera.log.manager.bootstrap.LogStoragePlugin;
 import org.apache.ozhera.log.manager.common.context.MoneUserContext;
@@ -25,15 +31,13 @@ import 
org.apache.ozhera.log.manager.mapper.MilogEsClusterMapper;
 import org.apache.ozhera.log.manager.model.pojo.MilogEsClusterDO;
 import 
org.apache.ozhera.log.manager.service.extension.store.StoreExtensionService;
 import 
org.apache.ozhera.log.manager.service.extension.store.StoreExtensionServiceFactory;
-import com.xiaomi.youpin.docean.Ioc;
-import com.xiaomi.youpin.docean.anno.Service;
-import com.xiaomi.youpin.docean.common.StringUtils;
-import com.xiaomi.youpin.docean.plugin.es.EsService;
-import lombok.extern.slf4j.Slf4j;
 
 import javax.annotation.Resource;
 import java.util.List;
 import java.util.Objects;
+import java.util.stream.Collectors;
+
+import static org.apache.ozhera.log.common.Constant.YES;
 
 @Service
 @Slf4j
@@ -130,6 +134,15 @@ public class EsCluster {
             return null;
         }
         if (clusterList.size() > 1) {
+            List<MilogEsClusterDO> esClusterDoList = clusterList.stream()
+                    .filter(data -> Objects.equals(data.getIsDefault(), YES))
+                    .collect(Collectors.toList());
+            if (CollectionUtils.isNotEmpty(esClusterDoList) && 
esClusterDoList.size() == 1) {
+                return esClusterDoList.getFirst();
+            }
+            if (CollectionUtils.isNotEmpty(esClusterDoList)) {
+                clusterList = esClusterDoList;
+            }
             String zone = MoneUserContext.getCurrentUser().getZone();
             for (MilogEsClusterDO clusterDO : clusterList) {
                 if (Objects.equals(zone, clusterDO.getTag())) {
@@ -137,6 +150,6 @@ public class EsCluster {
                 }
             }
         }
-        return clusterList.get(0);
+        return clusterList.getFirst();
     }
 }
diff --git 
a/ozhera-log/log-manager/src/main/java/org/apache/ozhera/log/manager/service/impl/MilogConfigNacosServiceImpl.java
 
b/ozhera-log/log-manager/src/main/java/org/apache/ozhera/log/manager/service/impl/MilogConfigNacosServiceImpl.java
index 3e2261a3..712ba725 100644
--- 
a/ozhera-log/log-manager/src/main/java/org/apache/ozhera/log/manager/service/impl/MilogConfigNacosServiceImpl.java
+++ 
b/ozhera-log/log-manager/src/main/java/org/apache/ozhera/log/manager/service/impl/MilogConfigNacosServiceImpl.java
@@ -225,12 +225,11 @@ public class MilogConfigNacosServiceImpl implements 
MilogConfigNacosService {
 
     /**
      * compatible When the queried IP is different from the actual one, the 
actual one is returned
-     *
-     * @param existConfig
-     * @param ipList
-     * @return
      */
     private List<String> ensureDefaultCompatibility(MiLogStreamConfig 
existConfig, List<String> ipList) {
+        if (null == existConfig) {
+            return ipList;
+        }
         Set<String> keySet = existConfig.getConfig().keySet();
         if (!CollectionUtils.isEqualCollection(keySet, ipList)) {
             log.info("ipList not belong to config,query list:{},actual 
list:{}", GSON.toJson(ipList), GSON.toJson(keySet));
diff --git 
a/ozhera-log/log-manager/src/main/java/org/apache/ozhera/log/manager/service/impl/MilogMiddlewareConfigServiceImpl.java
 
b/ozhera-log/log-manager/src/main/java/org/apache/ozhera/log/manager/service/impl/MilogMiddlewareConfigServiceImpl.java
index 7a765fb9..311acfa4 100644
--- 
a/ozhera-log/log-manager/src/main/java/org/apache/ozhera/log/manager/service/impl/MilogMiddlewareConfigServiceImpl.java
+++ 
b/ozhera-log/log-manager/src/main/java/org/apache/ozhera/log/manager/service/impl/MilogMiddlewareConfigServiceImpl.java
@@ -25,6 +25,10 @@ import com.baomidou.mybatisplus.core.conditions.Wrapper;
 import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
 import com.baomidou.mybatisplus.core.toolkit.Wrappers;
 import com.google.common.collect.Lists;
+import com.xiaomi.youpin.docean.anno.Service;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.commons.lang3.StringUtils;
 import org.apache.ozhera.log.api.enums.*;
 import org.apache.ozhera.log.api.model.bo.MiLogResource;
 import org.apache.ozhera.log.api.model.bo.ResourcePage;
@@ -55,10 +59,6 @@ import org.apache.ozhera.log.manager.service.BaseService;
 import org.apache.ozhera.log.manager.service.MilogMiddlewareConfigService;
 import 
org.apache.ozhera.log.manager.service.extension.resource.ResourceExtensionService;
 import 
org.apache.ozhera.log.manager.service.extension.resource.ResourceExtensionServiceFactory;
-import com.xiaomi.youpin.docean.anno.Service;
-import lombok.extern.slf4j.Slf4j;
-import org.apache.commons.collections.CollectionUtils;
-import org.apache.commons.lang3.StringUtils;
 import org.jetbrains.annotations.NotNull;
 import org.nutz.dao.Cnd;
 import org.nutz.dao.Condition;
@@ -539,8 +539,8 @@ public class MilogMiddlewareConfigServiceImpl extends 
BaseService implements Mil
 
     private List<String> mergeUserAndResourceLabels(String creatorUId, String 
updaterUId, List<String> existLabels) {
         List<String> resourceDeptLabels = 
resourceExtensionService.generateResourceLabels(updaterUId);
-        if (!Objects.equals(creatorUId, updaterUId) &&
-                CollectionUtils.isNotEmpty(existLabels) && 
!CollUtil.containsAll(existLabels, resourceDeptLabels)) {
+        if ((!Objects.equals(creatorUId, updaterUId) ||
+                CollectionUtils.isEmpty(existLabels)) || 
!CollUtil.containsAll(existLabels, resourceDeptLabels)) {
             existLabels.addAll(resourceDeptLabels);
         }
         return existLabels;
diff --git 
a/ozhera-log/log-manager/src/main/java/org/apache/ozhera/log/manager/service/nacos/ManagerLevelFilterConfigListener.java
 
b/ozhera-log/log-manager/src/main/java/org/apache/ozhera/log/manager/service/nacos/ManagerLevelFilterConfigListener.java
index e99ba3d4..5c869640 100644
--- 
a/ozhera-log/log-manager/src/main/java/org/apache/ozhera/log/manager/service/nacos/ManagerLevelFilterConfigListener.java
+++ 
b/ozhera-log/log-manager/src/main/java/org/apache/ozhera/log/manager/service/nacos/ManagerLevelFilterConfigListener.java
@@ -24,6 +24,7 @@ import com.xiaomi.data.push.common.SafeRun;
 import com.xiaomi.youpin.docean.anno.Component;
 import com.xiaomi.youpin.docean.plugin.nacos.NacosConfig;
 import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang3.StringUtils;
 import org.apache.ozhera.log.manager.dao.MilogLogTailDao;
 import org.apache.ozhera.log.manager.dao.MilogLogstoreDao;
 import org.apache.ozhera.log.manager.mapper.MilogLogTemplateMapper;
@@ -92,12 +93,15 @@ public class ManagerLevelFilterConfigListener {
         ScheduledExecutorService scheduledExecutor = Executors
                 
.newSingleThreadScheduledExecutor(ThreadUtil.newNamedThreadFactory("log-level-filter-manager",
 false));
         scheduledExecutor.scheduleAtFixedRate(() ->
-                SafeRun.run(() -> configChangeOperator()), 1, 1, 
TimeUnit.MINUTES);
+                SafeRun.run(this::configChangeOperator), 1, 1, 
TimeUnit.MINUTES);
 
     }
 
     public void configChangeOperator() {
         String filterConfig = nacosConfig.getConfigStr(logLevelFilterKey, 
DEFAULT_GROUP_ID, DEFAULT_TIME_OUT_MS);
+        if (StringUtils.isEmpty(filterConfig)) {
+            return;
+        }
         ManagerLogFilterConfig newConfig = GSON.fromJson(filterConfig, 
ManagerLogFilterConfig.class);
 
         if (Objects.equals(config, newConfig)) return;
@@ -112,7 +116,7 @@ public class ManagerLevelFilterConfigListener {
             if (newConfig != null) {
                 List<MilogLogTailDo> newLogtailList = 
logtailDao.getMilogLogtail(newConfig.getTailIdList());
                 newLogtailList.forEach(tail -> 
tail.setFilterLogLevelList(newConfig.getLogLevelList()));
-                List<Long> newIdList = 
newLogtailList.stream().map(MilogLogTailDo::getId).toList();
+                List<Long> newIdList = newConfig.getTailIdList();
                 oldLogtailList = oldLogtailList.stream().filter(tail -> 
!newIdList.contains(tail.getId())).toList();
                 updateMilogLogtailList.addAll(newLogtailList);
             }
@@ -121,18 +125,18 @@ public class ManagerLevelFilterConfigListener {
 
             for (MilogLogTailDo tailDo : updateMilogLogtailList) {
                 boolean isSuccess = logtailDao.update(tailDo);
-                if (isSuccess){
+                if (isSuccess) {
                     log.info("update tail and send to agent, the message of 
tail is: {}", tailDo);
                     updateSingleTail(tailDo);
                 }
             }
         }
 
-        if (newConfig!= null && newConfig.getEnableGlobalFilter()) {
+        if (newConfig != null && newConfig.getEnableGlobalFilter()) {
             if (config != null && config.getEnableGlobalFilter() && 
areElementsSameIgnoreCase(newConfig.getLogLevelList(), 
config.getLogLevelList())) {
                 return;
             }
-            globalUpdateSendMsg();
+//            globalUpdateSendMsg();
         }
         config = newConfig;
     }
diff --git 
a/ozhera-log/log-manager/src/main/java/org/apache/ozhera/log/manager/service/nacos/impl/StreamConfigNacosPublisher.java
 
b/ozhera-log/log-manager/src/main/java/org/apache/ozhera/log/manager/service/nacos/impl/StreamConfigNacosPublisher.java
index ca7509d6..2682d64d 100644
--- 
a/ozhera-log/log-manager/src/main/java/org/apache/ozhera/log/manager/service/nacos/impl/StreamConfigNacosPublisher.java
+++ 
b/ozhera-log/log-manager/src/main/java/org/apache/ozhera/log/manager/service/nacos/impl/StreamConfigNacosPublisher.java
@@ -20,12 +20,12 @@ package org.apache.ozhera.log.manager.service.nacos.impl;
 
 import com.alibaba.nacos.api.config.ConfigService;
 import com.alibaba.nacos.api.exception.NacosException;
-import 
org.apache.ozhera.log.manager.service.extension.common.CommonExtensionServiceFactory;
-import org.apache.ozhera.log.manager.service.nacos.DynamicConfigPublisher;
-import org.apache.ozhera.log.model.MiLogStreamConfig;
 import lombok.Getter;
 import lombok.Setter;
 import lombok.extern.slf4j.Slf4j;
+import 
org.apache.ozhera.log.manager.service.extension.common.CommonExtensionServiceFactory;
+import org.apache.ozhera.log.manager.service.nacos.DynamicConfigPublisher;
+import org.apache.ozhera.log.model.MiLogStreamConfig;
 
 import static org.apache.ozhera.log.common.Constant.DEFAULT_GROUP_ID;
 
@@ -44,13 +44,14 @@ public class StreamConfigNacosPublisher implements 
DynamicConfigPublisher<MiLogS
 
     @Override
     public synchronized void publish(Long spaceId, MiLogStreamConfig config) {
-        if (config == null) {
+        if (config == null || null == configService) {
+            log.error("config is null or configService is 
null,spaceId:{},config:{}", spaceId, gson.toJson(config));
             return;
         }
         try {
             
configService.publishConfig(CommonExtensionServiceFactory.getCommonExtensionService().getSpaceDataId(spaceId),
 DEFAULT_GROUP_ID, gson.toJson(config));
         } catch (NacosException e) {
-            log.error(String.format("Create namespace push data exceptions, 
parameters:%s", gson.toJson(config)), e);
+            log.error("Create namespace push data exceptions, parameters:{}", 
gson.toJson(config), e);
         }
     }
 
diff --git 
a/ozhera-log/log-stream/src/main/java/org/apache/ozhera/log/stream/config/ConfigManager.java
 
b/ozhera-log/log-stream/src/main/java/org/apache/ozhera/log/stream/config/ConfigManager.java
index d73b5d3c..58c136fd 100644
--- 
a/ozhera-log/log-stream/src/main/java/org/apache/ozhera/log/stream/config/ConfigManager.java
+++ 
b/ozhera-log/log-stream/src/main/java/org/apache/ozhera/log/stream/config/ConfigManager.java
@@ -19,7 +19,8 @@
 package org.apache.ozhera.log.stream.config;
 
 import com.alibaba.nacos.api.config.listener.Listener;
-import com.google.gson.Gson;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
 import com.xiaomi.youpin.docean.Ioc;
 import com.xiaomi.youpin.docean.anno.Service;
 import com.xiaomi.youpin.docean.common.StringUtils;
@@ -73,11 +74,13 @@ public class ConfigManager {
      * key: spaceId
      * value: milogSpaceData
      */
+    @Getter
     private ConcurrentHashMap<Long, MilogSpaceData> milogSpaceDataMap = new 
ConcurrentHashMap<>();
 
-    private Gson gson = new Gson();
+    private final ObjectMapper objectMapper = new ObjectMapper();
+    ;
 
-    private ReentrantLock spaceLock = new ReentrantLock();
+    private final ReentrantLock spaceLock = new ReentrantLock();
 
     /**
      * Executed once when the service starts
@@ -101,7 +104,7 @@ public class ConfigManager {
             for (String uniqueMark : split) {
                 if (config.containsKey(uniqueMark)) {
                     Map<Long, String> milogStreamDataMap = 
config.get(uniqueMark);
-                    log.info("[ConfigManager.initConfigManager] 
uniqueMark:{},data:{}", uniqueMark, gson.toJson(milogStreamDataMap));
+                    log.info("[ConfigManager.initConfigManager] 
uniqueMark:{},data:{}", uniqueMark, 
objectMapper.writeValueAsString(milogStreamDataMap));
                     for (Long spaceId : milogStreamDataMap.keySet()) {
                         final String dataId = milogStreamDataMap.get(spaceId);
                         // init spaceData config
@@ -130,7 +133,8 @@ public class ConfigManager {
         listeners.put(spaceId, listener);
     }
 
-    private static ExecutorService THREAD_POOL = 
Executors.newVirtualThreadPerTaskExecutor();
+    private static final ExecutorService THREAD_POOL = 
Executors.newVirtualThreadPerTaskExecutor();
+    private static final ObjectMapper MAPPER = new ObjectMapper();
 
     public void listenLogStreamConfig() {
         nacosConfig.addListener(spaceDataId, DEFAULT_GROUP_ID, new Listener() {
@@ -142,18 +146,18 @@ public class ConfigManager {
             @Override
             public void receiveConfigInfo(String spaceStr) {
                 try {
-                    MiLogStreamConfig milogStreamConfig = 
GSON.fromJson(spaceStr, MiLogStreamConfig.class);
+                    MiLogStreamConfig milogStreamConfig = 
MAPPER.readValue(spaceStr, MiLogStreamConfig.class);
                     handleMiLogStreamConfig(milogStreamConfig);
                 } catch (Exception e) {
-                    log.error("Error deserializing 
MiLogStreamConfig,spaceStr:{}", spaceStr, e);
+                    log.error(String.format("error deserializing 
MiLogStreamConfig,spaceStr:%s", spaceStr), e);
                 }
             }
         });
     }
 
-    private void handleMiLogStreamConfig(MiLogStreamConfig milogStreamConfig) {
+    private void handleMiLogStreamConfig(MiLogStreamConfig milogStreamConfig) 
throws Exception {
         String uniqueMark = StreamUtils.getCurrentMachineMark();
-        log.info("listening namespace received a configuration 
request,{},uniqueMark:{}", gson.toJson(milogStreamConfig), uniqueMark);
+        log.info("listening namespace received a configuration 
request,{},uniqueMark:{}", objectMapper.writeValueAsString(milogStreamConfig), 
uniqueMark);
 
         if (milogStreamConfig != null) {
             Map<String, Map<Long, String>> config = 
milogStreamConfig.getConfig();
@@ -167,13 +171,14 @@ public class ConfigManager {
         }
     }
 
-    private void processConfigForUniqueMark(String uniqueMark, Map<String, 
Map<Long, String>> config) {
+    private void processConfigForUniqueMark(String uniqueMark, Map<String, 
Map<Long, String>> config) throws Exception {
         StreamCommonExtension extensionInstance = 
getStreamCommonExtensionInstance();
         if (!extensionInstance.checkUniqueMarkExists(uniqueMark, config)) {
             log.warn("listen dataID:{},groupId:{},but receive config is 
empty,uniqueMarks:{}", spaceDataId, DEFAULT_GROUP_ID, uniqueMark);
             return;
         }
         Map<Long, String> dataIdMap = 
extensionInstance.getConfigMapByUniqueMark(config, uniqueMark);
+        log.info("uniqueMark:{},data key:{}", uniqueMark, 
objectMapper.writeValueAsString(dataIdMap.keySet()));
         if (spaceLock.tryLock()) {
             try {
                 stopUnusefulListenerAndJob(dataIdMap);
@@ -207,7 +212,8 @@ public class ConfigManager {
      * @param newLogStreamDataMap new {spaceId,dataId}
      * @return Returns a list of {dataId} that are no longer needed
      */
-    public List<Long> unUseFilter(Map<Long, String> newLogStreamDataMap) {
+    public List<Long> unUseFilter(Map<Long, String> newLogStreamDataMap) 
throws Exception {
+        log.info("unUseFilter,newConfig key:{},oldConfig key:{}", 
objectMapper.writeValueAsString(newLogStreamDataMap.keySet()), 
objectMapper.writeValueAsString(milogSpaceDataMap.keySet()));
         List<Long> unUseSpaceIds = new ArrayList<>(milogSpaceDataMap.keySet());
         unUseSpaceIds.removeIf(newLogStreamDataMap::containsKey);
         return unUseSpaceIds;
@@ -219,7 +225,7 @@ public class ConfigManager {
      *
      * @param milogStreamDataMap
      */
-    public void stopUnusefulListenerAndJob(Map<Long, String> 
milogStreamDataMap) {
+    public void stopUnusefulListenerAndJob(Map<Long, String> 
milogStreamDataMap) throws Exception {
         List<Long> unUseSpaceIds = unUseFilter(milogStreamDataMap);
         if (CollectionUtils.isEmpty(unUseSpaceIds)) {
             return;
@@ -230,35 +236,39 @@ public class ConfigManager {
         unUseSpaceIds.forEach(this::stopAndRemoveListenerAndJob);
     }
 
-    private void logUnusefulSpaceIds(List<Long> unUseSpaceIds) {
-        log.info("[listening namespace] The space ID that needs to be stopped: 
{}", gson.toJson(unUseSpaceIds));
+    private void logUnusefulSpaceIds(List<Long> unUseSpaceIds) throws 
Exception {
+        log.info("[listening namespace] The space ID that needs to be stopped: 
{}", objectMapper.writeValueAsString(unUseSpaceIds));
         logExistingListeners();
     }
 
-    private void logExistingListeners() {
+    private void logExistingListeners() throws Exception {
         List<Long> listenerKeys = new ArrayList<>(listeners.keySet());
-        log.info("[listening namespace] all listeners already exist: {}", 
gson.toJson(listenerKeys));
+        log.info("[listening namespace] all listeners already exist: {}", 
objectMapper.writeValueAsString(listenerKeys));
     }
 
 
     private void stopAndRemoveListenerAndJob(Long stopSpaceId) {
-        MilogConfigListener spaceConfigListener = listeners.get(stopSpaceId);
+        try {
+            MilogConfigListener spaceConfigListener = 
listeners.get(stopSpaceId);
 
-        if (spaceConfigListener != null) {
-            log.info("stopping the space ID: {}", stopSpaceId);
-            spaceConfigListener.shutdown();
-            log.info("Removing stopSpaceId: {} log tail config listener", 
stopSpaceId);
-            listeners.remove(stopSpaceId);
-        } else {
-            log.warn("No space ID in the current listener is ready to be 
stopped: {}", stopSpaceId);
+            if (spaceConfigListener != null) {
+                log.info("stopping the space ID: {}", stopSpaceId);
+                spaceConfigListener.shutdown();
+                log.info("Removing stopSpaceId: {} log tail config listener", 
stopSpaceId);
+                listeners.remove(stopSpaceId);
+            } else {
+                log.warn("No space ID in the current listener is ready to be 
stopped: {}", stopSpaceId);
+            }
+            stopJob(stopSpaceId, spaceConfigListener);
+        } catch (Exception e) {
+            log.error("error stopping space ID: {}", stopSpaceId, e);
         }
-        stopJob(stopSpaceId, spaceConfigListener);
     }
 
-    private void stopJob(Long stopSpaceId, MilogConfigListener 
spaceConfigListener) {
+    private void stopJob(Long stopSpaceId, MilogConfigListener 
spaceConfigListener) throws JsonProcessingException {
         MilogSpaceData milogSpaceData = milogSpaceDataMap.get(stopSpaceId);
 
-        if (milogSpaceData != null) {
+        if (milogSpaceData != null && null != spaceConfigListener) {
             spaceConfigListener.getJobManager().closeJobs(milogSpaceData);
             log.info("Closing stopSpaceId: {} logTail consumer job", 
stopSpaceId);
         } else {
@@ -272,21 +282,25 @@ public class ConfigManager {
      * 1. The listener corresponds to the data ID
      * 2. start job
      *
-     * @param milogStreamDataMap
      */
     public void startNewListenerAndJob(Map<Long, String> milogStreamDataMap) {
         milogStreamDataMap.forEach((spaceId, dataId) -> {
             // there is no listener corresponding to the spaceId in memory
-            if (!existListener(spaceId)) {
-                log.info("startNewListenerAndJob for spaceId:{}", spaceId);
-                // Get a copy of the spaceData configuration through the 
dataID and put it in the configListener cache
-                MilogSpaceData milogSpaceData = getMilogSpaceData(dataId);
+            try {
+                if (!existListener(spaceId)) {
+                    log.info("startNewListenerAndJob for spaceId start:{}", 
spaceId);
+                    // Get a copy of the spaceData configuration through the 
dataID and put it in the configListener cache
+                    MilogSpaceData milogSpaceData = getMilogSpaceData(dataId);
 
-                // Listen configuration
-                MilogConfigListener configListener = new 
MilogConfigListener(spaceId, dataId, DEFAULT_GROUP_ID, milogSpaceData, 
nacosConfig);
-                addListener(spaceId, configListener);
+                    // Listen configuration
+                    MilogConfigListener configListener = new 
MilogConfigListener(spaceId, dataId, DEFAULT_GROUP_ID, milogSpaceData, 
nacosConfig);
+                    addListener(spaceId, configListener);
 
-                milogSpaceDataMap.put(spaceId, milogSpaceData);
+                    milogSpaceDataMap.put(spaceId, milogSpaceData);
+                    log.info("startNewListenerAndJob for spaceId 
end:{},dataId:{}", spaceId, dataId);
+                }
+            } catch (Exception e) {
+                log.error(String.format("startNewListenerAndJob for spaceId 
error:%s", spaceId), e);
             }
         });
     }
@@ -295,7 +309,7 @@ public class ConfigManager {
         try {
             // get a copy of the spaceData configuration through the data ID
             String logSpaceDataStr = nacosConfig.getConfigStr(dataId, 
DEFAULT_GROUP_ID, DEFAULT_TIME_OUT_MS);
-            return StringUtils.isNotEmpty(logSpaceDataStr) ? 
GSON.fromJson(logSpaceDataStr, MilogSpaceData.class) : new MilogSpaceData();
+            return StringUtils.isNotEmpty(logSpaceDataStr) ? 
MAPPER.readValue(logSpaceDataStr, MilogSpaceData.class) : new MilogSpaceData();
         } catch (Exception e) {
             log.error("Failed to get MilogSpaceData for dataId: {}", dataId, 
e);
             // Handle the exception by providing a default logSpaceData
@@ -303,8 +317,4 @@ public class ConfigManager {
         }
     }
 
-    public ConcurrentHashMap<Long, MilogSpaceData> getMilogSpaceDataMap() {
-        return milogSpaceDataMap;
-    }
-
 }
diff --git 
a/ozhera-log/log-stream/src/main/java/org/apache/ozhera/log/stream/config/MilogConfigListener.java
 
b/ozhera-log/log-stream/src/main/java/org/apache/ozhera/log/stream/config/MilogConfigListener.java
index b0d31c9e..f0e8a51b 100644
--- 
a/ozhera-log/log-stream/src/main/java/org/apache/ozhera/log/stream/config/MilogConfigListener.java
+++ 
b/ozhera-log/log-stream/src/main/java/org/apache/ozhera/log/stream/config/MilogConfigListener.java
@@ -315,8 +315,8 @@ public class MilogConfigListener {
                     if (StringUtils.equals(originConfig, dataValue)) {
                         return;
                     }
-                    originConfig = dataValue;
                     log.info("listen tail received a configuration 
request:{},origin config:{}", dataValue, originConfig);
+                    originConfig = dataValue;
                     if (StringUtils.isNotEmpty(dataValue) && 
!Constant.NULLVALUE.equals(dataValue)) {
                         dataValue = 
streamCommonExtension.dataPreProcess(dataValue);
                         MilogSpaceData newMilogSpaceData = 
GSON.fromJson(dataValue, MilogSpaceData.class);
diff --git 
a/ozhera-log/log-stream/src/main/java/org/apache/ozhera/log/stream/job/JobManager.java
 
b/ozhera-log/log-stream/src/main/java/org/apache/ozhera/log/stream/job/JobManager.java
index 41665703..c650c267 100644
--- 
a/ozhera-log/log-stream/src/main/java/org/apache/ozhera/log/stream/job/JobManager.java
+++ 
b/ozhera-log/log-stream/src/main/java/org/apache/ozhera/log/stream/job/JobManager.java
@@ -18,7 +18,8 @@
  */
 package org.apache.ozhera.log.stream.job;
 
-import com.google.gson.Gson;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
 import com.xiaomi.youpin.docean.Ioc;
 import lombok.Data;
 import lombok.extern.slf4j.Slf4j;
@@ -55,7 +56,7 @@ public class JobManager {
 
     private String sinkJobType;
 
-    private Gson gson = new Gson();
+    private ObjectMapper objectMapper = new ObjectMapper();
 
     public JobManager() {
         sinkJobType = Config.ins().get(SINK_JOB_TYPE_KEY, "");
@@ -63,9 +64,9 @@ public class JobManager {
         jobs = new ConcurrentHashMap<>();
     }
 
-    public void closeJobs(MilogSpaceData milogSpaceData) {
+    public void closeJobs(MilogSpaceData milogSpaceData) throws 
JsonProcessingException {
         List<SinkConfig> configList = milogSpaceData.getSpaceConfig();
-        log.info("tasks that are already running:{},The task that is about to 
be shut down:{}", gson.toJson(jobs), gson.toJson(milogSpaceData));
+        log.info("tasks that are already running:{},The task that is about to 
be shut down:{}", objectMapper.writeValueAsString(jobs), 
objectMapper.writeValueAsString(milogSpaceData));
         if (CollectionUtils.isNotEmpty(configList)) {
             for (SinkConfig sinkConfig : configList) {
                 List<LogtailConfig> tailConfigs = 
sinkConfig.getLogtailConfigs();
@@ -189,7 +190,7 @@ public class JobManager {
             String clusterInfo = logtailConfig.getClusterInfo();
             String type = logtailConfig.getType();
             if (StringUtils.isEmpty(clusterInfo) || 
StringUtils.isEmpty(logtailConfig.getTopic())) {
-                log.info("start job error,ak or sk or logTailConfig 
null,ak:{},sk:{},logTailConfig:{}", ak, sk, gson.toJson(logtailConfig));
+                log.info("start job error,ak or sk or logTailConfig 
null,ak:{},sk:{},logTailConfig:{}", ak, sk, 
objectMapper.writeValueAsString(logtailConfig));
                 return;
             }
             startConsumerJob(type, ak, sk, clusterInfo, logtailConfig, 
sinkConfig, logSpaceId);


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to