This is an automated email from the ASF dual-hosted git repository.
dingtao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozhera.git
The following commit(s) were added to refs/heads/master by this push:
new a507e8ad feat: log parsing and system enhancement update (#604)
a507e8ad is described below
commit a507e8ad29de614138cb2eb7a28ec885d58b4100
Author: wtt <[email protected]>
AuthorDate: Thu Sep 4 14:03:11 2025 +0800
feat: log parsing and system enhancement update (#604)
* feat: optimize log acquisition thread pool and add new features
* feat: log optimize log parsing and processing functions
* refactor: remove the version number in pom xml
---
.../ozhera/log/agent/channel/ChannelEngine.java | 7 +-
.../channel/memory/AgentMemoryServiceImpl.java | 2 +-
.../ozhera/log/agent/common/ExecutorUtil.java | 11 +-
ozhera-log/log-api/pom.xml | 14 +-
.../ozhera/log/api/model/msg/LineMessage.java | 2 +
.../apache/ozhera/log/parse/PlaceholderParser.java | 182 +++++++++++++--------
.../apache/ozhera/log/common/LogParserTest.java | 10 +-
.../apache/ozhera/log/common/PathUtilsTest.java | 14 +-
.../org/apache/ozhera/log/manager/domain/Tpc.java | 43 +++--
.../log/manager/model/pojo/MilogLogTailDo.java | 6 +-
.../ozhera/log/stream/config/ConfigManager.java | 35 ++--
.../apache/ozhera/log/stream/job/JobManager.java | 2 +
.../ozhera/log/stream/job/LogDataTransfer.java | 6 +
.../ozhera/log/stream/plugin/es/EsPlugin.java | 6 +-
.../plugin/nacos/LevelFilterConfigListener.java | 2 +-
15 files changed, 222 insertions(+), 120 deletions(-)
diff --git
a/ozhera-log/log-agent/src/main/java/org/apache/ozhera/log/agent/channel/ChannelEngine.java
b/ozhera-log/log-agent/src/main/java/org/apache/ozhera/log/agent/channel/ChannelEngine.java
index a1c3b53d..1d29f573 100644
---
a/ozhera-log/log-agent/src/main/java/org/apache/ozhera/log/agent/channel/ChannelEngine.java
+++
b/ozhera-log/log-agent/src/main/java/org/apache/ozhera/log/agent/channel/ChannelEngine.java
@@ -73,7 +73,7 @@ import static org.apache.ozhera.log.common.Constant.GSON;
@Service
@Slf4j
public class ChannelEngine {
- private AgentMemoryService agentMemoryService;
+ private volatile AgentMemoryService agentMemoryService;
private ChannelDefineLocator channelDefineLocator;
/**
@@ -91,7 +91,7 @@ public class ChannelEngine {
private String memoryBasePath;
- private Gson gson = GSON;
+ private final Gson gson = GSON;
@Getter
private volatile boolean initComplete;
@@ -117,7 +117,7 @@ public class ChannelEngine {
channelServiceFactory = new
ChannelServiceFactory(agentMemoryService, memoryBasePath);
log.info("query channelDefineList:{}",
gson.toJson(channelDefineList));
- channelServiceList = channelDefineList.stream()
+ channelServiceList = channelDefineList.parallelStream()
.filter(channelDefine ->
filterCollStart(channelDefine.getAppName()))
.map(channelDefine -> {
ChannelService channelService =
this.channelServiceTrans(channelDefine);
@@ -270,6 +270,7 @@ public class ChannelEngine {
if (null == agentMemoryService) {
agentMemoryService = new
AgentMemoryServiceImpl(org.apache.ozhera.log.common.Config.ins().get("agent.memory.path",
AgentMemoryService.DEFAULT_BASE_PATH));
}
+ log.info("channelServiceTrans,channelDefine,channelId:{}",
channelDefine.getChannelId());
return channelServiceFactory.createChannelService(channelDefine,
exporter, filterChain);
} catch (Throwable e) {
log.error("channelServiceTrans exception, channelDefine:{}",
gson.toJson(channelDefine), e);
diff --git
a/ozhera-log/log-agent/src/main/java/org/apache/ozhera/log/agent/channel/memory/AgentMemoryServiceImpl.java
b/ozhera-log/log-agent/src/main/java/org/apache/ozhera/log/agent/channel/memory/AgentMemoryServiceImpl.java
index 0a479995..e577dfd9 100644
---
a/ozhera-log/log-agent/src/main/java/org/apache/ozhera/log/agent/channel/memory/AgentMemoryServiceImpl.java
+++
b/ozhera-log/log-agent/src/main/java/org/apache/ozhera/log/agent/channel/memory/AgentMemoryServiceImpl.java
@@ -127,7 +127,7 @@ public class AgentMemoryServiceImpl implements
AgentMemoryService {
}
@Override
- public List<ChannelMemory> restoreFromDisk() {
+ public List<ChannelMemory> restoreFromDisk() {
List<ChannelMemory> channelMemoryList = new ArrayList<>();
File file = new File(this.basePath + MEMORY_DIR);
File[] fs = file.listFiles();
diff --git
a/ozhera-log/log-agent/src/main/java/org/apache/ozhera/log/agent/common/ExecutorUtil.java
b/ozhera-log/log-agent/src/main/java/org/apache/ozhera/log/agent/common/ExecutorUtil.java
index 7d5d0441..dce80f31 100644
---
a/ozhera-log/log-agent/src/main/java/org/apache/ozhera/log/agent/common/ExecutorUtil.java
+++
b/ozhera-log/log-agent/src/main/java/org/apache/ozhera/log/agent/common/ExecutorUtil.java
@@ -19,7 +19,10 @@
package org.apache.ozhera.log.agent.common;
import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.ozhera.log.utils.ConfigUtils;
+import java.util.Objects;
import java.util.concurrent.*;
/**
@@ -47,7 +50,13 @@ public class ExecutorUtil {
}
public static ExecutorService createPool(String name) {
- System.setProperty("jdk.virtualThreadScheduler.parallelism",
String.valueOf(Runtime.getRuntime().availableProcessors() + 1));
+ String configValue =
ConfigUtils.getConfigValue("jdk.virtualThreadScheduler.parallelism");
+ if (StringUtils.isEmpty(configValue) || Objects.equals("-1",
configValue)) {
+ System.setProperty("jdk.virtualThreadScheduler.parallelism",
String.valueOf(Runtime.getRuntime().availableProcessors() + 1));
+ } else {
+ System.setProperty("jdk.virtualThreadScheduler.parallelism",
configValue);
+ }
+ log.info("jdk.virtualThreadScheduler.parallelism:{}", configValue);
ThreadFactory factory = Thread.ofVirtual().name("ExecutorUtil-" + name
+ "-Virtual-Thread", 0)
.uncaughtExceptionHandler((t, e) ->
log.error("ExecutorUtil-TP-Virtual-Thread uncaughtException:{}",
e.getMessage(), e)).factory();
return Executors.newThreadPerTaskExecutor(factory);
diff --git a/ozhera-log/log-api/pom.xml b/ozhera-log/log-api/pom.xml
index aef2be5a..d69b7cf2 100644
--- a/ozhera-log/log-api/pom.xml
+++ b/ozhera-log/log-api/pom.xml
@@ -19,7 +19,8 @@ http://www.apache.org/licenses/LICENSE-2.0
under the License.
-->
-<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
+<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<groupId>org.apache.ozhera</groupId>
<artifactId>ozhera-log</artifactId>
@@ -33,6 +34,17 @@ http://www.apache.org/licenses/LICENSE-2.0
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>
+ <dependencies>
+ <dependency>
+ <groupId>com.fasterxml.jackson.core</groupId>
+ <artifactId>jackson-core</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>com.fasterxml.jackson.core</groupId>
+ <artifactId>jackson-annotations</artifactId>
+ </dependency>
+ </dependencies>
+
<build>
<resources>
<resource>
diff --git
a/ozhera-log/log-api/src/main/java/org/apache/ozhera/log/api/model/msg/LineMessage.java
b/ozhera-log/log-api/src/main/java/org/apache/ozhera/log/api/model/msg/LineMessage.java
index 119cafa3..ae81d570 100644
---
a/ozhera-log/log-api/src/main/java/org/apache/ozhera/log/api/model/msg/LineMessage.java
+++
b/ozhera-log/log-api/src/main/java/org/apache/ozhera/log/api/model/msg/LineMessage.java
@@ -18,6 +18,7 @@
*/
package org.apache.ozhera.log.api.model.msg;
+import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import lombok.ToString;
import java.io.Serializable;
@@ -28,6 +29,7 @@ import java.util.Map;
* @author shanwb
* @date 2021-07-19
*/
+@JsonIgnoreProperties(ignoreUnknown = true)
@ToString
public class LineMessage implements Serializable {
diff --git
a/ozhera-log/log-common/src/main/java/org/apache/ozhera/log/parse/PlaceholderParser.java
b/ozhera-log/log-common/src/main/java/org/apache/ozhera/log/parse/PlaceholderParser.java
index 307fc3d4..6987a1ab 100644
---
a/ozhera-log/log-common/src/main/java/org/apache/ozhera/log/parse/PlaceholderParser.java
+++
b/ozhera-log/log-common/src/main/java/org/apache/ozhera/log/parse/PlaceholderParser.java
@@ -21,35 +21,58 @@ package org.apache.ozhera.log.parse;
import java.util.*;
/**
+ *
+ * 支持:
+ * 1. %s 普通字段
+ * 2. %s[n@sep] 合并 n 个 token
+ * 3. %s[-] 丢弃字段
+ * 4. 只在相邻两个 %s 之间的方括号可跳过,里面内容可提取
+ *
* @author wtt
* @version 1.0
* @description
* @date 2025/4/7 15:36
*/
public class PlaceholderParser extends AbstractLogParser {
- private final String parseScript;
private final List<String> fieldNames;
- private final List<String> staticParts;
+ private final List<ScriptPart> parts;
public PlaceholderParser(LogParserData parserData) {
super(parserData);
- parseScript = parserData.getParseScript();
- fieldNames = valueMap.entrySet().stream()
+ String parseScript = parserData.getParseScript();
+ this.fieldNames = valueMap.entrySet().stream()
.sorted(Map.Entry.comparingByValue())
.map(Map.Entry::getKey)
.toList();
- this.staticParts = splitAndFormat();
+ this.parts = splitScript(parseScript);
}
- private List<String> splitAndFormat() {
- List<String> parts = new ArrayList<>();
- String[] tokens = parseScript.split("%s", -1);
- for (String token : tokens) {
- if (!token.isEmpty()) {
- parts.add(token);
+ /** 拆分脚本为占位符和静态部分 */
+ private List<ScriptPart> splitScript(String script) {
+ List<ScriptPart> list = new ArrayList<>();
+ int idx = 0;
+ while (idx < script.length()) {
+ int next = script.indexOf("%s", idx);
+ if (next == -1) {
+ list.add(new ScriptPart(script.substring(idx), false, null));
+ break;
+ }
+ if (next > idx) {
+ list.add(new ScriptPart(script.substring(idx, next), false,
null));
+ }
+ // 检查 modifier
+ String modifier = null;
+ if (next + 2 < script.length() && script.charAt(next + 2) == '[') {
+ int end = script.indexOf(']', next + 2);
+ if (end != -1) {
+ modifier = script.substring(next + 3, end);
+ next = end;
+ }
}
+ list.add(new ScriptPart("%s", true, modifier));
+ idx = next + 2;
}
- return parts;
+ return list;
}
@Override
@@ -64,43 +87,7 @@ public class PlaceholderParser extends AbstractLogParser {
@Override
public List<String> parseLogData(String logData) throws Exception {
- List<String> result = new ArrayList<>();
- String remaining = logData;
- int staticPartIndex = 0;
-
- try {
- if (!staticParts.isEmpty() &&
remaining.startsWith(staticParts.get(0))) {
- remaining = remaining.substring(staticParts.get(0).length());
- staticPartIndex++;
- }
-
- while (staticPartIndex < staticParts.size()) {
- String nextStatic = staticParts.get(staticPartIndex);
- int endPos = remaining.indexOf(nextStatic);
-
- if (endPos == -1) {
- if (!remaining.trim().isEmpty()) {
- result.add(remaining.trim());
- }
- break;
- }
-
- String fieldValue = remaining.substring(0, endPos).trim();
- if (!fieldValue.isEmpty()) {
- result.add(fieldValue);
- }
-
- remaining = remaining.substring(endPos + nextStatic.length());
- staticPartIndex++;
- }
-
- if (!remaining.trim().isEmpty()) {
- result.add(remaining.trim());
- }
-
- } catch (Exception ignored) {
- }
- return result;
+ return new
ArrayList<>(parse(logData).values().stream().map(Object::toString).toList());
}
@@ -108,36 +95,97 @@ public class PlaceholderParser extends AbstractLogParser {
Map<String, Object> result = new LinkedHashMap<>();
String remaining = logLine;
int fieldIndex = 0;
- int staticPartIndex = 0;
try {
- if (!staticParts.isEmpty() &&
remaining.startsWith(staticParts.get(0))) {
- remaining = remaining.substring(staticParts.get(0).length());
- staticPartIndex++;
- }
+ for (int i = 0; i < parts.size(); i++) {
+ ScriptPart part = parts.get(i);
+ if (part.isPlaceholder) {
+ // 检查下一个占位符之间是否有方括号
+ String bracketValue = null;
+ if (i + 1 < parts.size() && parts.get(i +
1).isPlaceholder) {
+ // 尝试提取 [] 内的内容
+ if (!remaining.isEmpty() && remaining.charAt(0) ==
'[') {
+ int endBracket = remaining.indexOf(']');
+ if (endBracket != -1) {
+ bracketValue = remaining.substring(1,
endBracket);
+ remaining = remaining.substring(endBracket +
1);
+ }
+ }
+ }
- while (fieldIndex < fieldNames.size()) {
- String nextStatic = staticPartIndex < staticParts.size() ?
staticParts.get(staticPartIndex) : null;
- int endPos = nextStatic != null ?
remaining.indexOf(nextStatic) : remaining.length();
+ String fieldValue;
+
+ // n@sep 处理
+ if (part.modifier != null &&
part.modifier.matches("\\d+@.*")) {
+ String[] arr = part.modifier.split("@", 2);
+ int count = Integer.parseInt(arr[0]);
+ String sep = arr[1];
+ List<String> tokens = new ArrayList<>();
+ String temp = remaining;
+ for (int j = 0; j < count; j++) {
+ int idxSep = temp.indexOf(sep);
+ if (idxSep == -1) {
+ tokens.add(temp);
+ temp = "";
+ break;
+ } else {
+ tokens.add(temp.substring(0, idxSep));
+ temp = temp.substring(idxSep + sep.length());
+ }
+ }
+ fieldValue = String.join(sep, tokens).trim();
+ remaining = temp;
+ } else {
+ // 普通 %s
+ String nextStatic = null;
+ for (int j = i + 1; j < parts.size(); j++) {
+ if (!parts.get(j).isPlaceholder) {
+ nextStatic = parts.get(j).text;
+ break;
+ }
+ }
+ int endPos = nextStatic != null ?
remaining.indexOf(nextStatic) : remaining.length();
+ if (endPos == -1) {
+ endPos = remaining.length();
+ }
+ fieldValue = remaining.substring(0, endPos).trim();
+ remaining = remaining.substring(endPos);
+ }
- if (endPos == -1) return result;
+ // 如果有方括号值,可以组合或存入 Map
+ if (bracketValue != null && !bracketValue.isEmpty()) {
+ fieldValue = bracketValue;
+ }
- String fieldValue = remaining.substring(0, endPos).trim();
- result.put(fieldNames.get(fieldIndex++), fieldValue);
- remaining = remaining.substring(endPos);
+ // 丢弃字段
+ if (!"-".equals(part.modifier) && fieldIndex <
fieldNames.size()) {
+ result.put(fieldNames.get(fieldIndex++), fieldValue);
+ }
- if (nextStatic != null) {
- if (!remaining.startsWith(nextStatic)) {
+ } else {
+ // 静态部分严格匹配
+ if (!remaining.startsWith(part.text)) {
return Collections.emptyMap();
}
- remaining = remaining.substring(nextStatic.length());
- staticPartIndex++;
+ remaining = remaining.substring(part.text.length());
}
}
-
- return result;
} catch (Exception e) {
return Collections.emptyMap();
}
+
+ return result;
+ }
+
+ private static class ScriptPart {
+ String text;
+ boolean isPlaceholder;
+ String modifier;
+
+ public ScriptPart(String text, boolean isPlaceholder, String modifier)
{
+ this.text = text;
+ this.isPlaceholder = isPlaceholder;
+ this.modifier = modifier;
+ }
}
}
diff --git
a/ozhera-log/log-common/src/test/java/org/apache/ozhera/log/common/LogParserTest.java
b/ozhera-log/log-common/src/test/java/org/apache/ozhera/log/common/LogParserTest.java
index befc7e06..d419cf66 100644
---
a/ozhera-log/log-common/src/test/java/org/apache/ozhera/log/common/LogParserTest.java
+++
b/ozhera-log/log-common/src/test/java/org/apache/ozhera/log/common/LogParserTest.java
@@ -145,11 +145,11 @@ public class LogParserTest {
@Test
public void LogPlaceholderParserTest() throws Exception {
Stopwatch stopwatch = Stopwatch.createStarted();
- String keyList =
"level:keyword,message:text,message_body:text,hostname:keyword,trace_id:keyword,thread:keyword,timestamp:date,time:date,linenumber:long,mqtag:keyword,tail:keyword,filename:keyword,logstore:keyword,logsource:keyword,logip:keyword,mqtopic:keyword";
- String keyOrderList =
"level:1,message:1,message_body:1,hostname:1,trace_id:1,thread:1,timestamp:1,time:1,linenumber:3,mqtag:3,tail:3,filename:3,logstore:3,logsource:3,logip:3,mqtopic:3";
- String valueList = "2,5,-1,1,3,4,-1,0";
- String parseScript = "[%s] [%s] [%s] [%s] [%s] %s %s";
- String logData = "[2025-04-09T10:56:55.259+08:00] [kfs-test-123]
[ERROR] [485bf6a9b5898ecdfd22696325b11b05] [DubboServerHandler-thread-495]
c.x.k.w.s.i.DataDictServiceImpl - cache is not found. CacheLoader returned null
for key DataDictServiceImpl$TenantKey@2f123a.";
+ String keyList =
"timestamp:date,mqtopic:keyword,mqtag:keyword,logstore:keyword,logsource:keyword,message:text,tail:keyword,logip:keyword,linenumber:long,filename:keyword,level:keyword,time:keyword,trace_id:keyword,msg:keyword,tailId:integer,spaceId:integer,storeId:integer,deploySpace:keyword,traceId:keyword";
+ String keyOrderList =
"timestamp:1,mqtopic:3,mqtag:3,logstore:3,logsource:3,message:1,tail:3,logip:3,linenumber:3,filename:3,level:1,time:1,trace_id:1,msg:1,tailId:3,spaceId:3,storeId:3,deploySpace:3,traceId:1";
+ String valueList = "-1,-1,0,1,2,3,-1";
+ String parseScript = "[%s %s[2@ ] %s[-]] -[trace_id=%s] %s - %s";
+ String logData = "[INFO 2025-09-03 09:47:05.233 http-nio-1115-exec-4]
-[trace_id=47f8940234d777de50d17685171a6183]
com.test.update.web.controller.v0.UpdatesController - request info
d:sapphiren_id_global to test";
Long collectStamp = Instant.now().toEpochMilli();
Integer parserType =
LogParserFactory.LogParserEnum.PLACEHOLDER_PARSE.getCode();
LogParser customParse = LogParserFactory.getLogParser(parserType,
keyList, valueList, parseScript, topicName, tailName, tag, logStoreName,
keyOrderList);
diff --git
a/ozhera-log/log-common/src/test/java/org/apache/ozhera/log/common/PathUtilsTest.java
b/ozhera-log/log-common/src/test/java/org/apache/ozhera/log/common/PathUtilsTest.java
index 022f04de..b2ad8170 100644
---
a/ozhera-log/log-common/src/test/java/org/apache/ozhera/log/common/PathUtilsTest.java
+++
b/ozhera-log/log-common/src/test/java/org/apache/ozhera/log/common/PathUtilsTest.java
@@ -47,13 +47,13 @@ public class PathUtilsTest {
@Test
public void test00() {
- String logPattern =
"/home/work/logs/neo-logs/(a|b)/applogs/mi_com_event/mi_com_event.log";
+ String logPattern = "/home/work/log/log-test/info-*.log";
List<String> watches = PathUtils.parseWatchDirectory(logPattern);
System.out.println(new Gson().toJson(watches));
- Assert.assertEquals(2, watches.size());
- Assert.assertEquals("/home/work/logs/neo-logs/a/applogs",
watches.get(0));
- Assert.assertEquals("/home/work/logs/neo-logs/b/applogs",
watches.get(1));
+ Assert.assertEquals(1, watches.size());
+ Assert.assertEquals("/home/work/log/log-test/info-1.log", logPattern);
+ Assert.assertEquals("/home/work/logs/neo-logs/b/applogs", logPattern);
}
@Test
@@ -313,10 +313,10 @@ public class PathUtilsTest {
@Test
public void testLogPattern1() {
- String logSplitExpress =
"/home/work/logs/neo-logs/(eventapi-stable-66fd975598-z8fd9|eventapi-stable-66fd975598-zfhwq|eventapi-ams-runscript-stable-5c76f54c68-lrmcl|eventapi-stable-66fd975598-l7c4s|eventapi-stable-66fd975598-ckwwd)/applogs/mi_com_event.log-.*";
+ String logSplitExpress = "/home/work/log/log-test/info-.*.log";
Pattern pattern = Pattern.compile(logSplitExpress);
- Assert.assertEquals(true,
pattern.matcher("/home/work/logs/neo-logs/eventapi-stable-66fd975598-z8fd9/applogs/mi_com_event.log-2022-01-11-16").matches());
- Assert.assertEquals(true,
pattern.matcher("/home/work/logs/neo-logs/eventapi-ams-runscript-stable-5c76f54c68-lrmcl/applogs/mi_com_event.log-2022-01-11-16").matches());
+ Assert.assertEquals(true,
pattern.matcher("/home/work/log/log-test/info-1.log").matches());
+ Assert.assertEquals(false,
pattern.matcher("/home/work/logs/neo-logs/info-2.log").matches());
}
@Test
diff --git
a/ozhera-log/log-manager/src/main/java/org/apache/ozhera/log/manager/domain/Tpc.java
b/ozhera-log/log-manager/src/main/java/org/apache/ozhera/log/manager/domain/Tpc.java
index 39657110..52a61d9d 100644
---
a/ozhera-log/log-manager/src/main/java/org/apache/ozhera/log/manager/domain/Tpc.java
+++
b/ozhera-log/log-manager/src/main/java/org/apache/ozhera/log/manager/domain/Tpc.java
@@ -19,12 +19,6 @@
package org.apache.ozhera.log.manager.domain;
import com.google.common.collect.Lists;
-import org.apache.ozhera.log.manager.common.context.MoneUserContext;
-import org.apache.ozhera.log.manager.common.exception.MilogManageException;
-import org.apache.ozhera.log.manager.dao.MilogSpaceDao;
-import org.apache.ozhera.log.manager.model.MilogSpaceParam;
-import org.apache.ozhera.log.manager.model.pojo.MilogSpaceDO;
-import org.apache.ozhera.log.manager.user.MoneUser;
import com.xiaomi.mone.tpc.api.service.NodeFacade;
import com.xiaomi.mone.tpc.api.service.NodeUserFacade;
import com.xiaomi.mone.tpc.api.service.UserOrgFacade;
@@ -33,6 +27,7 @@ import com.xiaomi.mone.tpc.common.enums.NodeTypeEnum;
import com.xiaomi.mone.tpc.common.enums.OutIdTypeEnum;
import com.xiaomi.mone.tpc.common.enums.UserTypeEnum;
import com.xiaomi.mone.tpc.common.param.*;
+import com.xiaomi.mone.tpc.common.vo.NodeUserRelVo;
import com.xiaomi.mone.tpc.common.vo.NodeVo;
import com.xiaomi.mone.tpc.common.vo.OrgInfoVo;
import com.xiaomi.mone.tpc.common.vo.PageDataVo;
@@ -43,12 +38,19 @@ import com.xiaomi.youpin.docean.plugin.dubbo.anno.Reference;
import com.xiaomi.youpin.infra.rpc.Result;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections.CollectionUtils;
+import org.apache.ozhera.log.manager.common.context.MoneUserContext;
+import org.apache.ozhera.log.manager.common.exception.MilogManageException;
+import org.apache.ozhera.log.manager.dao.MilogSpaceDao;
+import org.apache.ozhera.log.manager.model.MilogSpaceParam;
+import org.apache.ozhera.log.manager.model.pojo.MilogSpaceDO;
+import org.apache.ozhera.log.manager.user.MoneUser;
import javax.annotation.Resource;
import java.util.List;
import java.util.Optional;
import java.util.stream.Collectors;
+import static org.apache.ozhera.log.common.Constant.DEFAULT_OPERATOR;
import static org.apache.ozhera.log.common.Constant.GSON;
@Service
@@ -251,27 +253,21 @@ public class Tpc {
tpcUserService.add(add);
}
- public NodeVo getByOuterId(Long id, Integer outType) {
+ public NodeVo getByOuterId(Long id, String account) {
MoneUser currentUser = MoneUserContext.getCurrentUser();
NodeQryParam param = new NodeQryParam();
- param.setPager(false);
- param.setAccount(currentUser == null ? "system" :
currentUser.getUser());
+ param.setAccount(StringUtils.isNotBlank(account) ? account :
currentUser.getUser());
param.setUserType(UserTypeEnum.CAS_TYPE.getCode());
- param.setType(NodeTypeEnum.PRO_SUB_GROUP.getCode());
- if (null != currentUser) {
- param.setUserType(currentUser.getUserType());
- }
param.setStatus(NodeStatusEnum.ENABLE.getCode());
param.setOutIdType(OutIdTypeEnum.SPACE.getCode());
param.setOutId(id);
param.setMyNode(false);
Result<NodeVo> nodeVoResult = tpcService.getByOutId(param);
- NodeVo data = nodeVoResult.getData();
- return data;
+ return nodeVoResult.getData();
}
public NodeVo getSpaceByOuterId(Long id) {
- return getByOuterId(id, OutIdTypeEnum.SPACE.getCode());
+ return getByOuterId(id, "");
}
public String getSpaceLastOrg(Long id) {
@@ -295,4 +291,19 @@ public class Tpc {
return res.getData();
}
+ public List<NodeUserRelVo> querySpaceMember(Long spaceId) {
+ NodeVo nodeVo = getByOuterId(spaceId, DEFAULT_OPERATOR);
+ if (null != nodeVo) {
+ NodeUserQryParam param = new NodeUserQryParam();
+ param.setNodeId(nodeVo.getId());
+ param.setAccount(DEFAULT_OPERATOR);
+ param.setUserType(UserTypeEnum.CAS_TYPE.getCode());
+ Result<PageDataVo<NodeUserRelVo>> nodeUserResult =
tpcUserService.list(param);
+ PageDataVo<NodeUserRelVo> nodeUserResultData =
nodeUserResult.getData();
+ return nodeUserResultData.getList();
+ }
+ return Lists.newArrayList();
+ }
+
+
}
diff --git
a/ozhera-log/log-manager/src/main/java/org/apache/ozhera/log/manager/model/pojo/MilogLogTailDo.java
b/ozhera-log/log-manager/src/main/java/org/apache/ozhera/log/manager/model/pojo/MilogLogTailDo.java
index c3824a54..08ed3bc2 100644
---
a/ozhera-log/log-manager/src/main/java/org/apache/ozhera/log/manager/model/pojo/MilogLogTailDo.java
+++
b/ozhera-log/log-manager/src/main/java/org/apache/ozhera/log/manager/model/pojo/MilogLogTailDo.java
@@ -19,11 +19,11 @@
package org.apache.ozhera.log.manager.model.pojo;
import com.alibaba.fastjson.annotation.JSONField;
+import lombok.Data;
+import lombok.EqualsAndHashCode;
import org.apache.ozhera.log.api.model.meta.FilterDefine;
import org.apache.ozhera.log.manager.model.BaseCommon;
import org.apache.ozhera.log.manager.model.dto.MotorRoomDTO;
-import lombok.Data;
-import lombok.EqualsAndHashCode;
import org.nutz.dao.entity.annotation.*;
import org.nutz.json.JsonField;
@@ -155,7 +155,7 @@ public class MilogLogTailDo extends BaseCommon {
@Column(value = "collection_ready")
@ColDefine(type = ColType.VARCHAR, width = 1024)
@Comment("start to ready coll")
- private Boolean collectionReady;
+ private Boolean collectionReady = true;
@Column(value = "origin_system")
@ColDefine(type = ColType.VARCHAR, width = 1024)
diff --git
a/ozhera-log/log-stream/src/main/java/org/apache/ozhera/log/stream/config/ConfigManager.java
b/ozhera-log/log-stream/src/main/java/org/apache/ozhera/log/stream/config/ConfigManager.java
index 58c136fd..833b3961 100644
---
a/ozhera-log/log-stream/src/main/java/org/apache/ozhera/log/stream/config/ConfigManager.java
+++
b/ozhera-log/log-stream/src/main/java/org/apache/ozhera/log/stream/config/ConfigManager.java
@@ -59,6 +59,9 @@ public class ConfigManager {
@Value("$hera.stream.monitor_space_data_id")
private String spaceDataId;
+ @Value("$hera.stream.type")
+ private String streamType;
+
//final String spaceDataId = LOG_MANAGE_PREFIX + NAMESPACE_CONFIG_DATA_ID;
/**
@@ -177,11 +180,25 @@ public class ConfigManager {
log.warn("listen dataID:{},groupId:{},but receive config is
empty,uniqueMarks:{}", spaceDataId, DEFAULT_GROUP_ID, uniqueMark);
return;
}
- Map<Long, String> dataIdMap =
extensionInstance.getConfigMapByUniqueMark(config, uniqueMark);
- log.info("uniqueMark:{},data key:{}", uniqueMark,
objectMapper.writeValueAsString(dataIdMap.keySet()));
+ if ("all".equals(streamType)) {
+ for (Map.Entry<String, Map<Long, String>> entry :
config.entrySet()) {
+ Map<Long, String> dataIdMap = entry.getValue();
+ log.info("uniqueMark:{},data key:{}", uniqueMark,
objectMapper.writeValueAsString(dataIdMap.keySet()));
+ processDataIdMapWithLock(dataIdMap);
+ }
+ } else {
+ Map<Long, String> dataIdMap =
extensionInstance.getConfigMapByUniqueMark(config, uniqueMark);
+ log.info("uniqueMark:{},data key:{}", uniqueMark,
objectMapper.writeValueAsString(dataIdMap.keySet()));
+ processDataIdMapWithLock(dataIdMap);
+ }
+ }
+
+ private void processDataIdMapWithLock(Map<Long, String> dataIdMap) throws
Exception {
if (spaceLock.tryLock()) {
try {
- stopUnusefulListenerAndJob(dataIdMap);
+ if (!"all".equals(streamType)) {
+ stopUnusefulListenerAndJob(dataIdMap);
+ }
startNewListenerAndJob(dataIdMap);
} finally {
spaceLock.unlock();
@@ -196,16 +213,6 @@ public class ConfigManager {
return Ioc.ins().getBean(factualServiceName);
}
- /**
- * The new {spaceid,dataid} A is compared to {spaceid,dataid} B in memory
to filter out the sets A-B
- *
- * @param spaceId
- * @return
- */
- public boolean existListener(Long spaceId) {
- return milogSpaceDataMap.containsKey(spaceId);
- }
-
/**
* The new {spaceId,dataId} A is compared to {spaceId,dataId} B in memory
and filtered out the set B-A
*
@@ -287,7 +294,7 @@ public class ConfigManager {
milogStreamDataMap.forEach((spaceId, dataId) -> {
// there is no listener corresponding to the spaceId in memory
try {
- if (!existListener(spaceId)) {
+ if (!milogSpaceDataMap.containsKey(spaceId)) {
log.info("startNewListenerAndJob for spaceId start:{}",
spaceId);
// Get a copy of the spaceData configuration through the
dataID and put it in the configListener cache
MilogSpaceData milogSpaceData = getMilogSpaceData(dataId);
diff --git
a/ozhera-log/log-stream/src/main/java/org/apache/ozhera/log/stream/job/JobManager.java
b/ozhera-log/log-stream/src/main/java/org/apache/ozhera/log/stream/job/JobManager.java
index c650c267..a7357809 100644
---
a/ozhera-log/log-stream/src/main/java/org/apache/ozhera/log/stream/job/JobManager.java
+++
b/ozhera-log/log-stream/src/main/java/org/apache/ozhera/log/stream/job/JobManager.java
@@ -20,6 +20,7 @@ package org.apache.ozhera.log.stream.job;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.SerializationFeature;
import com.xiaomi.youpin.docean.Ioc;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
@@ -62,6 +63,7 @@ public class JobManager {
sinkJobType = Config.ins().get(SINK_JOB_TYPE_KEY, "");
sinkChain = Ioc.ins().getBean(SinkChain.class);
jobs = new ConcurrentHashMap<>();
+ objectMapper.configure(SerializationFeature.FAIL_ON_EMPTY_BEANS,
false);
}
public void closeJobs(MilogSpaceData milogSpaceData) throws
JsonProcessingException {
diff --git
a/ozhera-log/log-stream/src/main/java/org/apache/ozhera/log/stream/job/LogDataTransfer.java
b/ozhera-log/log-stream/src/main/java/org/apache/ozhera/log/stream/job/LogDataTransfer.java
index c312e965..bb31de93 100644
---
a/ozhera-log/log-stream/src/main/java/org/apache/ozhera/log/stream/job/LogDataTransfer.java
+++
b/ozhera-log/log-stream/src/main/java/org/apache/ozhera/log/stream/job/LogDataTransfer.java
@@ -140,6 +140,12 @@ public class LogDataTransfer {
return objectMapper.readValue(msg, LineMessage.class);
}
+ public static void main(String[] args) throws JsonProcessingException {
+ String message =
"{\"extMap\":{\"ct\":\"1756780824719\",\"ip\":\"10.7.84.220\",\"tag\":\"tags_392_120935_132193\",\"type\":\"1\"},\"fileName\":\"/home/work/log/nr-promotion-promotion-admin-global-1080348-c-67bbfb479c-27t45/promotion-admin-global/server.log\",\"lineNumber\":2642,\"msgBody\":\"2025-09-02
10:40:21,610|INFO
|4a1378575e03c5d38fa9fd0f7351227a|call_client213|c.x.n.p.a.g.i.u.filter.DubboCommonFilter|dubbo
invoke. service:com.xiaomi.nr.promotion.admin.global.impl.DubboHea [...]
+ LineMessage lineMessage = new ObjectMapper().readValue(message,
LineMessage.class);
+ System.out.println(lineMessage.getTimestamp());
+ }
+
private void putCommonData(Map<String, Object> dataMap) {
dataMap.putIfAbsent(LOG_STREAM_SPACE_ID,
sinkJobConfig.getLogSpaceId());
dataMap.putIfAbsent(LOG_STREAM_STORE_ID,
sinkJobConfig.getLogStoreId());
diff --git
a/ozhera-log/log-stream/src/main/java/org/apache/ozhera/log/stream/plugin/es/EsPlugin.java
b/ozhera-log/log-stream/src/main/java/org/apache/ozhera/log/stream/plugin/es/EsPlugin.java
index f33fc498..cad12b8b 100644
---
a/ozhera-log/log-stream/src/main/java/org/apache/ozhera/log/stream/plugin/es/EsPlugin.java
+++
b/ozhera-log/log-stream/src/main/java/org/apache/ozhera/log/stream/plugin/es/EsPlugin.java
@@ -150,11 +150,15 @@ public class EsPlugin {
}
MqMessageDTO MqMessageDTO = new MqMessageDTO();
MqMessageDTO.setEsInfo(esInfo);
+ AtomicInteger count = new AtomicInteger();
List<MqMessageDTO.CompensateMqDTO> compensateMqDTOS =
Lists.newArrayList();
request.requests().stream().filter(x -> x instanceof IndexRequest)
.forEach(x -> {
+ int currentNum = count.incrementAndGet();
Map source = ((IndexRequest) x).sourceAsMap();
- log.error("Failure to handle index:[{}], type:[{}],id:[{}]
data:[{}]", x.index(), x.type(), x.id(), JSON.toJSONString(source));
+ if (currentNum == 1 || currentNum % 600 == 0) {
+ log.error("Failure to handle index:[{}],
type:[{}],id:[{}] data:[{}]", x.index(), x.type(), x.id(),
JSON.toJSONString(source));
+ }
MqMessageDTO.CompensateMqDTO compensateMqDTO = new
MqMessageDTO.CompensateMqDTO();
compensateMqDTO.setMsg(JSON.toJSONString(source));
compensateMqDTO.setEsIndex(x.index());
diff --git
a/ozhera-log/log-stream/src/main/java/org/apache/ozhera/log/stream/plugin/nacos/LevelFilterConfigListener.java
b/ozhera-log/log-stream/src/main/java/org/apache/ozhera/log/stream/plugin/nacos/LevelFilterConfigListener.java
index 1d376cf2..71b6b357 100644
---
a/ozhera-log/log-stream/src/main/java/org/apache/ozhera/log/stream/plugin/nacos/LevelFilterConfigListener.java
+++
b/ozhera-log/log-stream/src/main/java/org/apache/ozhera/log/stream/plugin/nacos/LevelFilterConfigListener.java
@@ -58,7 +58,7 @@ public class LevelFilterConfigListener {
ScheduledExecutorService scheduledExecutor = Executors
.newSingleThreadScheduledExecutor(ThreadUtil.newNamedThreadFactory("log-level-filter",
false));
scheduledExecutor.scheduleAtFixedRate(() ->
- SafeRun.run(() -> configChangeOperate()), 0, 1,
TimeUnit.MINUTES);
+ SafeRun.run(this::configChangeOperate), 0, 1,
TimeUnit.MINUTES);
}
private void configChangeOperate() {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]