This is an automated email from the ASF dual-hosted git repository.
dingtao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozhera.git
The following commit(s) were added to refs/heads/master by this push:
new ac2eb41e feat: optimize log acquisition thread pool and add new
features (#602)
ac2eb41e is described below
commit ac2eb41e33f998892dbb5f5040c61ec07d440b0c
Author: wtt <[email protected]>
AuthorDate: Tue Aug 19 15:15:11 2025 +0800
feat: optimize log acquisition thread pool and add new features (#602)
---
.../log/agent/channel/AbstractChannelService.java | 8 ++
.../log/agent/channel/ChannelServiceImpl.java | 4 +-
.../agent/channel/WildcardChannelServiceImpl.java | 3 +-
.../ozhera/log/agent/common/ExecutorUtil.java | 8 +-
.../ozhera/log/manager/dao/MilogLogTailDao.java | 1 +
.../ozhera/log/manager/domain/EsCluster.java | 25 ++++--
.../service/impl/MilogConfigNacosServiceImpl.java | 7 +-
.../impl/MilogMiddlewareConfigServiceImpl.java | 12 +--
.../nacos/ManagerLevelFilterConfigListener.java | 14 ++--
.../nacos/impl/StreamConfigNacosPublisher.java | 11 +--
.../ozhera/log/stream/config/ConfigManager.java | 92 ++++++++++++----------
.../log/stream/config/MilogConfigListener.java | 2 +-
.../apache/ozhera/log/stream/job/JobManager.java | 11 +--
13 files changed, 118 insertions(+), 80 deletions(-)
diff --git
a/ozhera-log/log-agent/src/main/java/org/apache/ozhera/log/agent/channel/AbstractChannelService.java
b/ozhera-log/log-agent/src/main/java/org/apache/ozhera/log/agent/channel/AbstractChannelService.java
index 009df07f..a82bbdce 100644
---
a/ozhera-log/log-agent/src/main/java/org/apache/ozhera/log/agent/channel/AbstractChannelService.java
+++
b/ozhera-log/log-agent/src/main/java/org/apache/ozhera/log/agent/channel/AbstractChannelService.java
@@ -25,6 +25,7 @@ import com.xiaomi.mone.file.common.FileUtils;
import lombok.extern.slf4j.Slf4j;
import org.apache.ozhera.log.agent.channel.memory.ChannelMemory;
import org.apache.ozhera.log.agent.common.ChannelUtil;
+import org.apache.ozhera.log.agent.common.ExecutorUtil;
import org.apache.ozhera.log.agent.input.Input;
import org.apache.ozhera.log.api.enums.LogTypeEnum;
import org.apache.ozhera.log.api.model.meta.LogPattern;
@@ -36,6 +37,7 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
+import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
@@ -230,4 +232,10 @@ public abstract class AbstractChannelService implements
ChannelService {
return false;
}
+ public ExecutorService getExecutorServiceByType(LogTypeEnum logTypeEnum) {
+ if (LogTypeEnum.OPENTELEMETRY == logTypeEnum) {
+ return ExecutorUtil.TELE_TP_EXECUTOR;
+ }
+ return ExecutorUtil.TP_EXECUTOR;
+ }
}
diff --git
a/ozhera-log/log-agent/src/main/java/org/apache/ozhera/log/agent/channel/ChannelServiceImpl.java
b/ozhera-log/log-agent/src/main/java/org/apache/ozhera/log/agent/channel/ChannelServiceImpl.java
index 7e4ef178..07c5e325 100644
---
a/ozhera-log/log-agent/src/main/java/org/apache/ozhera/log/agent/channel/ChannelServiceImpl.java
+++
b/ozhera-log/log-agent/src/main/java/org/apache/ozhera/log/agent/channel/ChannelServiceImpl.java
@@ -293,7 +293,7 @@ public class ChannelServiceImpl extends
AbstractChannelService {
private void opentelemetryMonitor(String configPath) {
List<String> cleanedPathList =
ChannelUtil.buildLogExpressList(configPath);
- monitorFileList.add(MonitorFile.of(configPath, cleanedPathList.get(0),
logTypeEnum, collectOnce));
+ monitorFileList.add(MonitorFile.of(configPath,
cleanedPathList.getFirst(), logTypeEnum, collectOnce));
}
private ReadListener initFileReadListener(MLog mLog, String patternCode,
String ip, String pattern) {
@@ -393,7 +393,7 @@ public class ChannelServiceImpl extends
AbstractChannelService {
stopOldCurrentFileThread(filePath);
log.info("start to collect file,channelId:{},fileName:{}",
channelId, filePath);
logFileMap.put(filePath, logFile);
- Future<?> future = ExecutorUtil.submit(() -> {
+ Future<?> future = getExecutorServiceByType(logTypeEnum).submit(()
-> {
try {
log.info("filePath:{},is VirtualThread {},
thread:{},id:{}", filePath, Thread.currentThread().isVirtual(),
Thread.currentThread(), Thread.currentThread().threadId());
logFile.readLine();
diff --git
a/ozhera-log/log-agent/src/main/java/org/apache/ozhera/log/agent/channel/WildcardChannelServiceImpl.java
b/ozhera-log/log-agent/src/main/java/org/apache/ozhera/log/agent/channel/WildcardChannelServiceImpl.java
index 2db94d8f..0d261870 100644
---
a/ozhera-log/log-agent/src/main/java/org/apache/ozhera/log/agent/channel/WildcardChannelServiceImpl.java
+++
b/ozhera-log/log-agent/src/main/java/org/apache/ozhera/log/agent/channel/WildcardChannelServiceImpl.java
@@ -43,7 +43,6 @@ import org.apache.ozhera.log.agent.input.Input;
import org.apache.ozhera.log.api.enums.LogTypeEnum;
import org.apache.ozhera.log.api.model.meta.FilterConf;
import org.apache.ozhera.log.api.model.msg.LineMessage;
-import org.apache.ozhera.log.common.Config;
import org.apache.ozhera.log.common.PathUtils;
import java.io.File;
@@ -164,7 +163,7 @@ public class WildcardChannelServiceImpl extends
AbstractChannelService {
// Compile the file expression pattern
Pattern pattern = Pattern.compile(fileExpression);
for (String monitorPath : monitorPaths) {
- fileCollFutures.add(ExecutorUtil.submit(() ->
monitorFileChanges(fileMonitor, monitorPath, pattern)));
+
fileCollFutures.add(getExecutorServiceByType(getLogTypeEnum()).submit(() ->
monitorFileChanges(fileMonitor, monitorPath, pattern)));
}
} catch (Exception e) {
log.error("startCollectFile error, channelId: {}, input: {}, ip:
{}", channelId, GSON.toJson(input), ip, e);
diff --git
a/ozhera-log/log-agent/src/main/java/org/apache/ozhera/log/agent/common/ExecutorUtil.java
b/ozhera-log/log-agent/src/main/java/org/apache/ozhera/log/agent/common/ExecutorUtil.java
index a9e94da3..7d5d0441 100644
---
a/ozhera-log/log-agent/src/main/java/org/apache/ozhera/log/agent/common/ExecutorUtil.java
+++
b/ozhera-log/log-agent/src/main/java/org/apache/ozhera/log/agent/common/ExecutorUtil.java
@@ -35,7 +35,9 @@ public class ExecutorUtil {
log.error("ExecutorUtil-STP-Virtual-Thread
uncaughtException:{}", e.getMessage(), e);
}).factory());
- public static ExecutorService TP_EXECUTOR = createPool();
+ public static ExecutorService TP_EXECUTOR = createPool("TP");
+
+ public static ExecutorService TELE_TP_EXECUTOR = createPool("TELE_TP");
public static ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
long initialDelay,
@@ -44,9 +46,9 @@ public class ExecutorUtil {
return STP_EXECUTOR.scheduleAtFixedRate(command, initialDelay, period,
unit);
}
- public static ExecutorService createPool() {
+ public static ExecutorService createPool(String name) {
System.setProperty("jdk.virtualThreadScheduler.parallelism",
String.valueOf(Runtime.getRuntime().availableProcessors() + 1));
- ThreadFactory factory =
Thread.ofVirtual().name("ExecutorUtil-TP-Virtual-Thread", 0)
+ ThreadFactory factory = Thread.ofVirtual().name("ExecutorUtil-" + name
+ "-Virtual-Thread", 0)
.uncaughtExceptionHandler((t, e) ->
log.error("ExecutorUtil-TP-Virtual-Thread uncaughtException:{}",
e.getMessage(), e)).factory();
return Executors.newThreadPerTaskExecutor(factory);
}
diff --git
a/ozhera-log/log-manager/src/main/java/org/apache/ozhera/log/manager/dao/MilogLogTailDao.java
b/ozhera-log/log-manager/src/main/java/org/apache/ozhera/log/manager/dao/MilogLogTailDao.java
index 4008f223..27c8b078 100644
---
a/ozhera-log/log-manager/src/main/java/org/apache/ozhera/log/manager/dao/MilogLogTailDao.java
+++
b/ozhera-log/log-manager/src/main/java/org/apache/ozhera/log/manager/dao/MilogLogTailDao.java
@@ -279,6 +279,7 @@ public class MilogLogTailDao {
public List<MilogLogTailDo> getLogTailByLastId(Long lastId, int pageSize) {
Sql sql = Sqls.queryEntity("SELECT * FROM milog_logstail WHERE id >
@lastId ORDER BY id LIMIT @pageSize");
+ sql.setEntity(dao.getEntity(MilogLogTailDo.class));
sql.params().set("lastId", lastId);
sql.params().set("pageSize", pageSize);
dao.execute(sql);
diff --git
a/ozhera-log/log-manager/src/main/java/org/apache/ozhera/log/manager/domain/EsCluster.java
b/ozhera-log/log-manager/src/main/java/org/apache/ozhera/log/manager/domain/EsCluster.java
index 84343732..e670b661 100644
---
a/ozhera-log/log-manager/src/main/java/org/apache/ozhera/log/manager/domain/EsCluster.java
+++
b/ozhera-log/log-manager/src/main/java/org/apache/ozhera/log/manager/domain/EsCluster.java
@@ -18,6 +18,12 @@
*/
package org.apache.ozhera.log.manager.domain;
+import com.xiaomi.youpin.docean.Ioc;
+import com.xiaomi.youpin.docean.anno.Service;
+import com.xiaomi.youpin.docean.common.StringUtils;
+import com.xiaomi.youpin.docean.plugin.es.EsService;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.collections.CollectionUtils;
import org.apache.ozhera.log.common.Constant;
import org.apache.ozhera.log.manager.bootstrap.LogStoragePlugin;
import org.apache.ozhera.log.manager.common.context.MoneUserContext;
@@ -25,15 +31,13 @@ import
org.apache.ozhera.log.manager.mapper.MilogEsClusterMapper;
import org.apache.ozhera.log.manager.model.pojo.MilogEsClusterDO;
import
org.apache.ozhera.log.manager.service.extension.store.StoreExtensionService;
import
org.apache.ozhera.log.manager.service.extension.store.StoreExtensionServiceFactory;
-import com.xiaomi.youpin.docean.Ioc;
-import com.xiaomi.youpin.docean.anno.Service;
-import com.xiaomi.youpin.docean.common.StringUtils;
-import com.xiaomi.youpin.docean.plugin.es.EsService;
-import lombok.extern.slf4j.Slf4j;
import javax.annotation.Resource;
import java.util.List;
import java.util.Objects;
+import java.util.stream.Collectors;
+
+import static org.apache.ozhera.log.common.Constant.YES;
@Service
@Slf4j
@@ -130,6 +134,15 @@ public class EsCluster {
return null;
}
if (clusterList.size() > 1) {
+ List<MilogEsClusterDO> esClusterDoList = clusterList.stream()
+ .filter(data -> Objects.equals(data.getIsDefault(), YES))
+ .collect(Collectors.toList());
+ if (CollectionUtils.isNotEmpty(esClusterDoList) &&
esClusterDoList.size() == 1) {
+ return esClusterDoList.getFirst();
+ }
+ if (CollectionUtils.isNotEmpty(esClusterDoList)) {
+ clusterList = esClusterDoList;
+ }
String zone = MoneUserContext.getCurrentUser().getZone();
for (MilogEsClusterDO clusterDO : clusterList) {
if (Objects.equals(zone, clusterDO.getTag())) {
@@ -137,6 +150,6 @@ public class EsCluster {
}
}
}
- return clusterList.get(0);
+ return clusterList.getFirst();
}
}
diff --git
a/ozhera-log/log-manager/src/main/java/org/apache/ozhera/log/manager/service/impl/MilogConfigNacosServiceImpl.java
b/ozhera-log/log-manager/src/main/java/org/apache/ozhera/log/manager/service/impl/MilogConfigNacosServiceImpl.java
index 3e2261a3..712ba725 100644
---
a/ozhera-log/log-manager/src/main/java/org/apache/ozhera/log/manager/service/impl/MilogConfigNacosServiceImpl.java
+++
b/ozhera-log/log-manager/src/main/java/org/apache/ozhera/log/manager/service/impl/MilogConfigNacosServiceImpl.java
@@ -225,12 +225,11 @@ public class MilogConfigNacosServiceImpl implements
MilogConfigNacosService {
/**
* compatible When the queried IP is different from the actual one, the
actual one is returned
- *
- * @param existConfig
- * @param ipList
- * @return
*/
private List<String> ensureDefaultCompatibility(MiLogStreamConfig
existConfig, List<String> ipList) {
+ if (null == existConfig) {
+ return ipList;
+ }
Set<String> keySet = existConfig.getConfig().keySet();
if (!CollectionUtils.isEqualCollection(keySet, ipList)) {
log.info("ipList not belong to config,query list:{},actual
list:{}", GSON.toJson(ipList), GSON.toJson(keySet));
diff --git
a/ozhera-log/log-manager/src/main/java/org/apache/ozhera/log/manager/service/impl/MilogMiddlewareConfigServiceImpl.java
b/ozhera-log/log-manager/src/main/java/org/apache/ozhera/log/manager/service/impl/MilogMiddlewareConfigServiceImpl.java
index 7a765fb9..311acfa4 100644
---
a/ozhera-log/log-manager/src/main/java/org/apache/ozhera/log/manager/service/impl/MilogMiddlewareConfigServiceImpl.java
+++
b/ozhera-log/log-manager/src/main/java/org/apache/ozhera/log/manager/service/impl/MilogMiddlewareConfigServiceImpl.java
@@ -25,6 +25,10 @@ import com.baomidou.mybatisplus.core.conditions.Wrapper;
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import com.baomidou.mybatisplus.core.toolkit.Wrappers;
import com.google.common.collect.Lists;
+import com.xiaomi.youpin.docean.anno.Service;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.commons.lang3.StringUtils;
import org.apache.ozhera.log.api.enums.*;
import org.apache.ozhera.log.api.model.bo.MiLogResource;
import org.apache.ozhera.log.api.model.bo.ResourcePage;
@@ -55,10 +59,6 @@ import org.apache.ozhera.log.manager.service.BaseService;
import org.apache.ozhera.log.manager.service.MilogMiddlewareConfigService;
import
org.apache.ozhera.log.manager.service.extension.resource.ResourceExtensionService;
import
org.apache.ozhera.log.manager.service.extension.resource.ResourceExtensionServiceFactory;
-import com.xiaomi.youpin.docean.anno.Service;
-import lombok.extern.slf4j.Slf4j;
-import org.apache.commons.collections.CollectionUtils;
-import org.apache.commons.lang3.StringUtils;
import org.jetbrains.annotations.NotNull;
import org.nutz.dao.Cnd;
import org.nutz.dao.Condition;
@@ -539,8 +539,8 @@ public class MilogMiddlewareConfigServiceImpl extends
BaseService implements Mil
private List<String> mergeUserAndResourceLabels(String creatorUId, String
updaterUId, List<String> existLabels) {
List<String> resourceDeptLabels =
resourceExtensionService.generateResourceLabels(updaterUId);
- if (!Objects.equals(creatorUId, updaterUId) &&
- CollectionUtils.isNotEmpty(existLabels) &&
!CollUtil.containsAll(existLabels, resourceDeptLabels)) {
+ if ((!Objects.equals(creatorUId, updaterUId) ||
+ CollectionUtils.isEmpty(existLabels)) ||
!CollUtil.containsAll(existLabels, resourceDeptLabels)) {
existLabels.addAll(resourceDeptLabels);
}
return existLabels;
diff --git
a/ozhera-log/log-manager/src/main/java/org/apache/ozhera/log/manager/service/nacos/ManagerLevelFilterConfigListener.java
b/ozhera-log/log-manager/src/main/java/org/apache/ozhera/log/manager/service/nacos/ManagerLevelFilterConfigListener.java
index e99ba3d4..5c869640 100644
---
a/ozhera-log/log-manager/src/main/java/org/apache/ozhera/log/manager/service/nacos/ManagerLevelFilterConfigListener.java
+++
b/ozhera-log/log-manager/src/main/java/org/apache/ozhera/log/manager/service/nacos/ManagerLevelFilterConfigListener.java
@@ -24,6 +24,7 @@ import com.xiaomi.data.push.common.SafeRun;
import com.xiaomi.youpin.docean.anno.Component;
import com.xiaomi.youpin.docean.plugin.nacos.NacosConfig;
import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang3.StringUtils;
import org.apache.ozhera.log.manager.dao.MilogLogTailDao;
import org.apache.ozhera.log.manager.dao.MilogLogstoreDao;
import org.apache.ozhera.log.manager.mapper.MilogLogTemplateMapper;
@@ -92,12 +93,15 @@ public class ManagerLevelFilterConfigListener {
ScheduledExecutorService scheduledExecutor = Executors
.newSingleThreadScheduledExecutor(ThreadUtil.newNamedThreadFactory("log-level-filter-manager",
false));
scheduledExecutor.scheduleAtFixedRate(() ->
- SafeRun.run(() -> configChangeOperator()), 1, 1,
TimeUnit.MINUTES);
+ SafeRun.run(this::configChangeOperator), 1, 1,
TimeUnit.MINUTES);
}
public void configChangeOperator() {
String filterConfig = nacosConfig.getConfigStr(logLevelFilterKey,
DEFAULT_GROUP_ID, DEFAULT_TIME_OUT_MS);
+ if (StringUtils.isEmpty(filterConfig)) {
+ return;
+ }
ManagerLogFilterConfig newConfig = GSON.fromJson(filterConfig,
ManagerLogFilterConfig.class);
if (Objects.equals(config, newConfig)) return;
@@ -112,7 +116,7 @@ public class ManagerLevelFilterConfigListener {
if (newConfig != null) {
List<MilogLogTailDo> newLogtailList =
logtailDao.getMilogLogtail(newConfig.getTailIdList());
newLogtailList.forEach(tail ->
tail.setFilterLogLevelList(newConfig.getLogLevelList()));
- List<Long> newIdList =
newLogtailList.stream().map(MilogLogTailDo::getId).toList();
+ List<Long> newIdList = newConfig.getTailIdList();
oldLogtailList = oldLogtailList.stream().filter(tail ->
!newIdList.contains(tail.getId())).toList();
updateMilogLogtailList.addAll(newLogtailList);
}
@@ -121,18 +125,18 @@ public class ManagerLevelFilterConfigListener {
for (MilogLogTailDo tailDo : updateMilogLogtailList) {
boolean isSuccess = logtailDao.update(tailDo);
- if (isSuccess){
+ if (isSuccess) {
log.info("update tail and send to agent, the message of
tail is: {}", tailDo);
updateSingleTail(tailDo);
}
}
}
- if (newConfig!= null && newConfig.getEnableGlobalFilter()) {
+ if (newConfig != null && newConfig.getEnableGlobalFilter()) {
if (config != null && config.getEnableGlobalFilter() &&
areElementsSameIgnoreCase(newConfig.getLogLevelList(),
config.getLogLevelList())) {
return;
}
- globalUpdateSendMsg();
+// globalUpdateSendMsg();
}
config = newConfig;
}
diff --git
a/ozhera-log/log-manager/src/main/java/org/apache/ozhera/log/manager/service/nacos/impl/StreamConfigNacosPublisher.java
b/ozhera-log/log-manager/src/main/java/org/apache/ozhera/log/manager/service/nacos/impl/StreamConfigNacosPublisher.java
index ca7509d6..2682d64d 100644
---
a/ozhera-log/log-manager/src/main/java/org/apache/ozhera/log/manager/service/nacos/impl/StreamConfigNacosPublisher.java
+++
b/ozhera-log/log-manager/src/main/java/org/apache/ozhera/log/manager/service/nacos/impl/StreamConfigNacosPublisher.java
@@ -20,12 +20,12 @@ package org.apache.ozhera.log.manager.service.nacos.impl;
import com.alibaba.nacos.api.config.ConfigService;
import com.alibaba.nacos.api.exception.NacosException;
-import
org.apache.ozhera.log.manager.service.extension.common.CommonExtensionServiceFactory;
-import org.apache.ozhera.log.manager.service.nacos.DynamicConfigPublisher;
-import org.apache.ozhera.log.model.MiLogStreamConfig;
import lombok.Getter;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
+import
org.apache.ozhera.log.manager.service.extension.common.CommonExtensionServiceFactory;
+import org.apache.ozhera.log.manager.service.nacos.DynamicConfigPublisher;
+import org.apache.ozhera.log.model.MiLogStreamConfig;
import static org.apache.ozhera.log.common.Constant.DEFAULT_GROUP_ID;
@@ -44,13 +44,14 @@ public class StreamConfigNacosPublisher implements
DynamicConfigPublisher<MiLogS
@Override
public synchronized void publish(Long spaceId, MiLogStreamConfig config) {
- if (config == null) {
+ if (config == null || null == configService) {
+ log.error("config is null or configService is
null,spaceId:{},config:{}", spaceId, gson.toJson(config));
return;
}
try {
configService.publishConfig(CommonExtensionServiceFactory.getCommonExtensionService().getSpaceDataId(spaceId),
DEFAULT_GROUP_ID, gson.toJson(config));
} catch (NacosException e) {
- log.error(String.format("Create namespace push data exceptions,
parameters:%s", gson.toJson(config)), e);
+ log.error("Create namespace push data exceptions, parameters:{}",
gson.toJson(config), e);
}
}
diff --git
a/ozhera-log/log-stream/src/main/java/org/apache/ozhera/log/stream/config/ConfigManager.java
b/ozhera-log/log-stream/src/main/java/org/apache/ozhera/log/stream/config/ConfigManager.java
index d73b5d3c..58c136fd 100644
---
a/ozhera-log/log-stream/src/main/java/org/apache/ozhera/log/stream/config/ConfigManager.java
+++
b/ozhera-log/log-stream/src/main/java/org/apache/ozhera/log/stream/config/ConfigManager.java
@@ -19,7 +19,8 @@
package org.apache.ozhera.log.stream.config;
import com.alibaba.nacos.api.config.listener.Listener;
-import com.google.gson.Gson;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
import com.xiaomi.youpin.docean.Ioc;
import com.xiaomi.youpin.docean.anno.Service;
import com.xiaomi.youpin.docean.common.StringUtils;
@@ -73,11 +74,13 @@ public class ConfigManager {
* key: spaceId
* value: milogSpaceData
*/
+ @Getter
private ConcurrentHashMap<Long, MilogSpaceData> milogSpaceDataMap = new
ConcurrentHashMap<>();
- private Gson gson = new Gson();
+ private final ObjectMapper objectMapper = new ObjectMapper();
+ ;
- private ReentrantLock spaceLock = new ReentrantLock();
+ private final ReentrantLock spaceLock = new ReentrantLock();
/**
* Executed once when the service starts
@@ -101,7 +104,7 @@ public class ConfigManager {
for (String uniqueMark : split) {
if (config.containsKey(uniqueMark)) {
Map<Long, String> milogStreamDataMap =
config.get(uniqueMark);
- log.info("[ConfigManager.initConfigManager]
uniqueMark:{},data:{}", uniqueMark, gson.toJson(milogStreamDataMap));
+ log.info("[ConfigManager.initConfigManager]
uniqueMark:{},data:{}", uniqueMark,
objectMapper.writeValueAsString(milogStreamDataMap));
for (Long spaceId : milogStreamDataMap.keySet()) {
final String dataId = milogStreamDataMap.get(spaceId);
// init spaceData config
@@ -130,7 +133,8 @@ public class ConfigManager {
listeners.put(spaceId, listener);
}
- private static ExecutorService THREAD_POOL =
Executors.newVirtualThreadPerTaskExecutor();
+ private static final ExecutorService THREAD_POOL =
Executors.newVirtualThreadPerTaskExecutor();
+ private static final ObjectMapper MAPPER = new ObjectMapper();
public void listenLogStreamConfig() {
nacosConfig.addListener(spaceDataId, DEFAULT_GROUP_ID, new Listener() {
@@ -142,18 +146,18 @@ public class ConfigManager {
@Override
public void receiveConfigInfo(String spaceStr) {
try {
- MiLogStreamConfig milogStreamConfig =
GSON.fromJson(spaceStr, MiLogStreamConfig.class);
+ MiLogStreamConfig milogStreamConfig =
MAPPER.readValue(spaceStr, MiLogStreamConfig.class);
handleMiLogStreamConfig(milogStreamConfig);
} catch (Exception e) {
- log.error("Error deserializing
MiLogStreamConfig,spaceStr:{}", spaceStr, e);
+ log.error(String.format("error deserializing
MiLogStreamConfig,spaceStr:%s", spaceStr), e);
}
}
});
}
- private void handleMiLogStreamConfig(MiLogStreamConfig milogStreamConfig) {
+ private void handleMiLogStreamConfig(MiLogStreamConfig milogStreamConfig)
throws Exception {
String uniqueMark = StreamUtils.getCurrentMachineMark();
- log.info("listening namespace received a configuration
request,{},uniqueMark:{}", gson.toJson(milogStreamConfig), uniqueMark);
+ log.info("listening namespace received a configuration
request,{},uniqueMark:{}", objectMapper.writeValueAsString(milogStreamConfig),
uniqueMark);
if (milogStreamConfig != null) {
Map<String, Map<Long, String>> config =
milogStreamConfig.getConfig();
@@ -167,13 +171,14 @@ public class ConfigManager {
}
}
- private void processConfigForUniqueMark(String uniqueMark, Map<String,
Map<Long, String>> config) {
+ private void processConfigForUniqueMark(String uniqueMark, Map<String,
Map<Long, String>> config) throws Exception {
StreamCommonExtension extensionInstance =
getStreamCommonExtensionInstance();
if (!extensionInstance.checkUniqueMarkExists(uniqueMark, config)) {
log.warn("listen dataID:{},groupId:{},but receive config is
empty,uniqueMarks:{}", spaceDataId, DEFAULT_GROUP_ID, uniqueMark);
return;
}
Map<Long, String> dataIdMap =
extensionInstance.getConfigMapByUniqueMark(config, uniqueMark);
+ log.info("uniqueMark:{},data key:{}", uniqueMark,
objectMapper.writeValueAsString(dataIdMap.keySet()));
if (spaceLock.tryLock()) {
try {
stopUnusefulListenerAndJob(dataIdMap);
@@ -207,7 +212,8 @@ public class ConfigManager {
* @param newLogStreamDataMap new {spaceId,dataId}
* @return Returns a list of {dataId} that are no longer needed
*/
- public List<Long> unUseFilter(Map<Long, String> newLogStreamDataMap) {
+ public List<Long> unUseFilter(Map<Long, String> newLogStreamDataMap)
throws Exception {
+ log.info("unUseFilter,newConfig key:{},oldConfig key:{}",
objectMapper.writeValueAsString(newLogStreamDataMap.keySet()),
objectMapper.writeValueAsString(milogSpaceDataMap.keySet()));
List<Long> unUseSpaceIds = new ArrayList<>(milogSpaceDataMap.keySet());
unUseSpaceIds.removeIf(newLogStreamDataMap::containsKey);
return unUseSpaceIds;
@@ -219,7 +225,7 @@ public class ConfigManager {
*
* @param milogStreamDataMap
*/
- public void stopUnusefulListenerAndJob(Map<Long, String>
milogStreamDataMap) {
+ public void stopUnusefulListenerAndJob(Map<Long, String>
milogStreamDataMap) throws Exception {
List<Long> unUseSpaceIds = unUseFilter(milogStreamDataMap);
if (CollectionUtils.isEmpty(unUseSpaceIds)) {
return;
@@ -230,35 +236,39 @@ public class ConfigManager {
unUseSpaceIds.forEach(this::stopAndRemoveListenerAndJob);
}
- private void logUnusefulSpaceIds(List<Long> unUseSpaceIds) {
- log.info("[listening namespace] The space ID that needs to be stopped:
{}", gson.toJson(unUseSpaceIds));
+ private void logUnusefulSpaceIds(List<Long> unUseSpaceIds) throws
Exception {
+ log.info("[listening namespace] The space ID that needs to be stopped:
{}", objectMapper.writeValueAsString(unUseSpaceIds));
logExistingListeners();
}
- private void logExistingListeners() {
+ private void logExistingListeners() throws Exception {
List<Long> listenerKeys = new ArrayList<>(listeners.keySet());
- log.info("[listening namespace] all listeners already exist: {}",
gson.toJson(listenerKeys));
+ log.info("[listening namespace] all listeners already exist: {}",
objectMapper.writeValueAsString(listenerKeys));
}
private void stopAndRemoveListenerAndJob(Long stopSpaceId) {
- MilogConfigListener spaceConfigListener = listeners.get(stopSpaceId);
+ try {
+ MilogConfigListener spaceConfigListener =
listeners.get(stopSpaceId);
- if (spaceConfigListener != null) {
- log.info("stopping the space ID: {}", stopSpaceId);
- spaceConfigListener.shutdown();
- log.info("Removing stopSpaceId: {} log tail config listener",
stopSpaceId);
- listeners.remove(stopSpaceId);
- } else {
- log.warn("No space ID in the current listener is ready to be
stopped: {}", stopSpaceId);
+ if (spaceConfigListener != null) {
+ log.info("stopping the space ID: {}", stopSpaceId);
+ spaceConfigListener.shutdown();
+ log.info("Removing stopSpaceId: {} log tail config listener",
stopSpaceId);
+ listeners.remove(stopSpaceId);
+ } else {
+ log.warn("No space ID in the current listener is ready to be
stopped: {}", stopSpaceId);
+ }
+ stopJob(stopSpaceId, spaceConfigListener);
+ } catch (Exception e) {
+ log.error("error stopping space ID: {}", stopSpaceId, e);
}
- stopJob(stopSpaceId, spaceConfigListener);
}
- private void stopJob(Long stopSpaceId, MilogConfigListener
spaceConfigListener) {
+ private void stopJob(Long stopSpaceId, MilogConfigListener
spaceConfigListener) throws JsonProcessingException {
MilogSpaceData milogSpaceData = milogSpaceDataMap.get(stopSpaceId);
- if (milogSpaceData != null) {
+ if (milogSpaceData != null && null != spaceConfigListener) {
spaceConfigListener.getJobManager().closeJobs(milogSpaceData);
log.info("Closing stopSpaceId: {} logTail consumer job",
stopSpaceId);
} else {
@@ -272,21 +282,25 @@ public class ConfigManager {
* 1. The listener corresponds to the data ID
* 2. start job
*
- * @param milogStreamDataMap
*/
public void startNewListenerAndJob(Map<Long, String> milogStreamDataMap) {
milogStreamDataMap.forEach((spaceId, dataId) -> {
// there is no listener corresponding to the spaceId in memory
- if (!existListener(spaceId)) {
- log.info("startNewListenerAndJob for spaceId:{}", spaceId);
- // Get a copy of the spaceData configuration through the
dataID and put it in the configListener cache
- MilogSpaceData milogSpaceData = getMilogSpaceData(dataId);
+ try {
+ if (!existListener(spaceId)) {
+ log.info("startNewListenerAndJob for spaceId start:{}",
spaceId);
+ // Get a copy of the spaceData configuration through the
dataID and put it in the configListener cache
+ MilogSpaceData milogSpaceData = getMilogSpaceData(dataId);
- // Listen configuration
- MilogConfigListener configListener = new
MilogConfigListener(spaceId, dataId, DEFAULT_GROUP_ID, milogSpaceData,
nacosConfig);
- addListener(spaceId, configListener);
+ // Listen configuration
+ MilogConfigListener configListener = new
MilogConfigListener(spaceId, dataId, DEFAULT_GROUP_ID, milogSpaceData,
nacosConfig);
+ addListener(spaceId, configListener);
- milogSpaceDataMap.put(spaceId, milogSpaceData);
+ milogSpaceDataMap.put(spaceId, milogSpaceData);
+ log.info("startNewListenerAndJob for spaceId
end:{},dataId:{}", spaceId, dataId);
+ }
+ } catch (Exception e) {
+ log.error(String.format("startNewListenerAndJob for spaceId
error:%s", spaceId), e);
}
});
}
@@ -295,7 +309,7 @@ public class ConfigManager {
try {
// get a copy of the spaceData configuration through the data ID
String logSpaceDataStr = nacosConfig.getConfigStr(dataId,
DEFAULT_GROUP_ID, DEFAULT_TIME_OUT_MS);
- return StringUtils.isNotEmpty(logSpaceDataStr) ?
GSON.fromJson(logSpaceDataStr, MilogSpaceData.class) : new MilogSpaceData();
+ return StringUtils.isNotEmpty(logSpaceDataStr) ?
MAPPER.readValue(logSpaceDataStr, MilogSpaceData.class) : new MilogSpaceData();
} catch (Exception e) {
log.error("Failed to get MilogSpaceData for dataId: {}", dataId,
e);
// Handle the exception by providing a default logSpaceData
@@ -303,8 +317,4 @@ public class ConfigManager {
}
}
- public ConcurrentHashMap<Long, MilogSpaceData> getMilogSpaceDataMap() {
- return milogSpaceDataMap;
- }
-
}
diff --git
a/ozhera-log/log-stream/src/main/java/org/apache/ozhera/log/stream/config/MilogConfigListener.java
b/ozhera-log/log-stream/src/main/java/org/apache/ozhera/log/stream/config/MilogConfigListener.java
index b0d31c9e..f0e8a51b 100644
---
a/ozhera-log/log-stream/src/main/java/org/apache/ozhera/log/stream/config/MilogConfigListener.java
+++
b/ozhera-log/log-stream/src/main/java/org/apache/ozhera/log/stream/config/MilogConfigListener.java
@@ -315,8 +315,8 @@ public class MilogConfigListener {
if (StringUtils.equals(originConfig, dataValue)) {
return;
}
- originConfig = dataValue;
log.info("listen tail received a configuration
request:{},origin config:{}", dataValue, originConfig);
+ originConfig = dataValue;
if (StringUtils.isNotEmpty(dataValue) &&
!Constant.NULLVALUE.equals(dataValue)) {
dataValue =
streamCommonExtension.dataPreProcess(dataValue);
MilogSpaceData newMilogSpaceData =
GSON.fromJson(dataValue, MilogSpaceData.class);
diff --git
a/ozhera-log/log-stream/src/main/java/org/apache/ozhera/log/stream/job/JobManager.java
b/ozhera-log/log-stream/src/main/java/org/apache/ozhera/log/stream/job/JobManager.java
index 41665703..c650c267 100644
---
a/ozhera-log/log-stream/src/main/java/org/apache/ozhera/log/stream/job/JobManager.java
+++
b/ozhera-log/log-stream/src/main/java/org/apache/ozhera/log/stream/job/JobManager.java
@@ -18,7 +18,8 @@
*/
package org.apache.ozhera.log.stream.job;
-import com.google.gson.Gson;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
import com.xiaomi.youpin.docean.Ioc;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
@@ -55,7 +56,7 @@ public class JobManager {
private String sinkJobType;
- private Gson gson = new Gson();
+ private ObjectMapper objectMapper = new ObjectMapper();
public JobManager() {
sinkJobType = Config.ins().get(SINK_JOB_TYPE_KEY, "");
@@ -63,9 +64,9 @@ public class JobManager {
jobs = new ConcurrentHashMap<>();
}
- public void closeJobs(MilogSpaceData milogSpaceData) {
+ public void closeJobs(MilogSpaceData milogSpaceData) throws
JsonProcessingException {
List<SinkConfig> configList = milogSpaceData.getSpaceConfig();
- log.info("tasks that are already running:{},The task that is about to
be shut down:{}", gson.toJson(jobs), gson.toJson(milogSpaceData));
+ log.info("tasks that are already running:{},The task that is about to
be shut down:{}", objectMapper.writeValueAsString(jobs),
objectMapper.writeValueAsString(milogSpaceData));
if (CollectionUtils.isNotEmpty(configList)) {
for (SinkConfig sinkConfig : configList) {
List<LogtailConfig> tailConfigs =
sinkConfig.getLogtailConfigs();
@@ -189,7 +190,7 @@ public class JobManager {
String clusterInfo = logtailConfig.getClusterInfo();
String type = logtailConfig.getType();
if (StringUtils.isEmpty(clusterInfo) ||
StringUtils.isEmpty(logtailConfig.getTopic())) {
- log.info("start job error,ak or sk or logTailConfig
null,ak:{},sk:{},logTailConfig:{}", ak, sk, gson.toJson(logtailConfig));
+ log.info("start job error,ak or sk or logTailConfig
null,ak:{},sk:{},logTailConfig:{}", ak, sk,
objectMapper.writeValueAsString(logtailConfig));
return;
}
startConsumerJob(type, ak, sk, clusterInfo, logtailConfig,
sinkConfig, logSpaceId);
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]