This is an automated email from the ASF dual-hosted git repository.

dingtao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozhera.git


The following commit(s) were added to refs/heads/master by this push:
     new a507e8ad feat: log parsing and system enhancement update (#604)
a507e8ad is described below

commit a507e8ad29de614138cb2eb7a28ec885d58b4100
Author: wtt <[email protected]>
AuthorDate: Thu Sep 4 14:03:11 2025 +0800

    feat: log parsing and system enhancement update (#604)
    
    * feat: optimize log acquisition thread pool and add new features
    
    * feat: log optimize log parsing and processing functions
    
    * refactor: remove the version number in pom xml
---
 .../ozhera/log/agent/channel/ChannelEngine.java    |   7 +-
 .../channel/memory/AgentMemoryServiceImpl.java     |   2 +-
 .../ozhera/log/agent/common/ExecutorUtil.java      |  11 +-
 ozhera-log/log-api/pom.xml                         |  14 +-
 .../ozhera/log/api/model/msg/LineMessage.java      |   2 +
 .../apache/ozhera/log/parse/PlaceholderParser.java | 182 +++++++++++++--------
 .../apache/ozhera/log/common/LogParserTest.java    |  10 +-
 .../apache/ozhera/log/common/PathUtilsTest.java    |  14 +-
 .../org/apache/ozhera/log/manager/domain/Tpc.java  |  43 +++--
 .../log/manager/model/pojo/MilogLogTailDo.java     |   6 +-
 .../ozhera/log/stream/config/ConfigManager.java    |  35 ++--
 .../apache/ozhera/log/stream/job/JobManager.java   |   2 +
 .../ozhera/log/stream/job/LogDataTransfer.java     |   6 +
 .../ozhera/log/stream/plugin/es/EsPlugin.java      |   6 +-
 .../plugin/nacos/LevelFilterConfigListener.java    |   2 +-
 15 files changed, 222 insertions(+), 120 deletions(-)

diff --git 
a/ozhera-log/log-agent/src/main/java/org/apache/ozhera/log/agent/channel/ChannelEngine.java
 
b/ozhera-log/log-agent/src/main/java/org/apache/ozhera/log/agent/channel/ChannelEngine.java
index a1c3b53d..1d29f573 100644
--- 
a/ozhera-log/log-agent/src/main/java/org/apache/ozhera/log/agent/channel/ChannelEngine.java
+++ 
b/ozhera-log/log-agent/src/main/java/org/apache/ozhera/log/agent/channel/ChannelEngine.java
@@ -73,7 +73,7 @@ import static org.apache.ozhera.log.common.Constant.GSON;
 @Service
 @Slf4j
 public class ChannelEngine {
-    private AgentMemoryService agentMemoryService;
+    private volatile AgentMemoryService agentMemoryService;
 
     private ChannelDefineLocator channelDefineLocator;
     /**
@@ -91,7 +91,7 @@ public class ChannelEngine {
 
     private String memoryBasePath;
 
-    private Gson gson = GSON;
+    private final Gson gson = GSON;
 
     @Getter
     private volatile boolean initComplete;
@@ -117,7 +117,7 @@ public class ChannelEngine {
             channelServiceFactory = new 
ChannelServiceFactory(agentMemoryService, memoryBasePath);
 
             log.info("query channelDefineList:{}", 
gson.toJson(channelDefineList));
-            channelServiceList = channelDefineList.stream()
+            channelServiceList = channelDefineList.parallelStream()
                     .filter(channelDefine -> 
filterCollStart(channelDefine.getAppName()))
                     .map(channelDefine -> {
                         ChannelService channelService = 
this.channelServiceTrans(channelDefine);
@@ -270,6 +270,7 @@ public class ChannelEngine {
             if (null == agentMemoryService) {
                 agentMemoryService = new 
AgentMemoryServiceImpl(org.apache.ozhera.log.common.Config.ins().get("agent.memory.path",
 AgentMemoryService.DEFAULT_BASE_PATH));
             }
+            log.info("channelServiceTrans,channelDefine,channelId:{}", 
channelDefine.getChannelId());
             return channelServiceFactory.createChannelService(channelDefine, 
exporter, filterChain);
         } catch (Throwable e) {
             log.error("channelServiceTrans exception, channelDefine:{}", 
gson.toJson(channelDefine), e);
diff --git 
a/ozhera-log/log-agent/src/main/java/org/apache/ozhera/log/agent/channel/memory/AgentMemoryServiceImpl.java
 
b/ozhera-log/log-agent/src/main/java/org/apache/ozhera/log/agent/channel/memory/AgentMemoryServiceImpl.java
index 0a479995..e577dfd9 100644
--- 
a/ozhera-log/log-agent/src/main/java/org/apache/ozhera/log/agent/channel/memory/AgentMemoryServiceImpl.java
+++ 
b/ozhera-log/log-agent/src/main/java/org/apache/ozhera/log/agent/channel/memory/AgentMemoryServiceImpl.java
@@ -127,7 +127,7 @@ public class AgentMemoryServiceImpl implements 
AgentMemoryService {
     }
 
     @Override
-    public List<ChannelMemory> restoreFromDisk() {
+    public List<ChannelMemory>  restoreFromDisk() {
         List<ChannelMemory> channelMemoryList = new ArrayList<>();
         File file = new File(this.basePath + MEMORY_DIR);
         File[] fs = file.listFiles();
diff --git 
a/ozhera-log/log-agent/src/main/java/org/apache/ozhera/log/agent/common/ExecutorUtil.java
 
b/ozhera-log/log-agent/src/main/java/org/apache/ozhera/log/agent/common/ExecutorUtil.java
index 7d5d0441..dce80f31 100644
--- 
a/ozhera-log/log-agent/src/main/java/org/apache/ozhera/log/agent/common/ExecutorUtil.java
+++ 
b/ozhera-log/log-agent/src/main/java/org/apache/ozhera/log/agent/common/ExecutorUtil.java
@@ -19,7 +19,10 @@
 package org.apache.ozhera.log.agent.common;
 
 import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.ozhera.log.utils.ConfigUtils;
 
+import java.util.Objects;
 import java.util.concurrent.*;
 
 /**
@@ -47,7 +50,13 @@ public class ExecutorUtil {
     }
 
     public static ExecutorService createPool(String name) {
-        System.setProperty("jdk.virtualThreadScheduler.parallelism", 
String.valueOf(Runtime.getRuntime().availableProcessors() + 1));
+        String configValue = 
ConfigUtils.getConfigValue("jdk.virtualThreadScheduler.parallelism");
+        if (StringUtils.isEmpty(configValue) || Objects.equals("-1", 
configValue)) {
+            System.setProperty("jdk.virtualThreadScheduler.parallelism", 
String.valueOf(Runtime.getRuntime().availableProcessors() + 1));
+        } else {
+            System.setProperty("jdk.virtualThreadScheduler.parallelism", 
configValue);
+        }
+        log.info("jdk.virtualThreadScheduler.parallelism:{}", configValue);
         ThreadFactory factory = Thread.ofVirtual().name("ExecutorUtil-" + name 
+ "-Virtual-Thread", 0)
                 .uncaughtExceptionHandler((t, e) -> 
log.error("ExecutorUtil-TP-Virtual-Thread uncaughtException:{}", 
e.getMessage(), e)).factory();
         return Executors.newThreadPerTaskExecutor(factory);
diff --git a/ozhera-log/log-api/pom.xml b/ozhera-log/log-api/pom.xml
index aef2be5a..d69b7cf2 100644
--- a/ozhera-log/log-api/pom.xml
+++ b/ozhera-log/log-api/pom.xml
@@ -19,7 +19,8 @@ http://www.apache.org/licenses/LICENSE-2.0
     under the License.
 
 -->
-<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"; 
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
     <parent>
         <groupId>org.apache.ozhera</groupId>
         <artifactId>ozhera-log</artifactId>
@@ -33,6 +34,17 @@ http://www.apache.org/licenses/LICENSE-2.0
         <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
     </properties>
 
+    <dependencies>
+        <dependency>
+            <groupId>com.fasterxml.jackson.core</groupId>
+            <artifactId>jackson-core</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>com.fasterxml.jackson.core</groupId>
+            <artifactId>jackson-annotations</artifactId>
+        </dependency>
+    </dependencies>
+
     <build>
         <resources>
             <resource>
diff --git 
a/ozhera-log/log-api/src/main/java/org/apache/ozhera/log/api/model/msg/LineMessage.java
 
b/ozhera-log/log-api/src/main/java/org/apache/ozhera/log/api/model/msg/LineMessage.java
index 119cafa3..ae81d570 100644
--- 
a/ozhera-log/log-api/src/main/java/org/apache/ozhera/log/api/model/msg/LineMessage.java
+++ 
b/ozhera-log/log-api/src/main/java/org/apache/ozhera/log/api/model/msg/LineMessage.java
@@ -18,6 +18,7 @@
  */
 package org.apache.ozhera.log.api.model.msg;
 
+import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
 import lombok.ToString;
 
 import java.io.Serializable;
@@ -28,6 +29,7 @@ import java.util.Map;
  * @author shanwb
  * @date 2021-07-19
  */
+@JsonIgnoreProperties(ignoreUnknown = true)
 @ToString
 public class LineMessage implements Serializable {
 
diff --git 
a/ozhera-log/log-common/src/main/java/org/apache/ozhera/log/parse/PlaceholderParser.java
 
b/ozhera-log/log-common/src/main/java/org/apache/ozhera/log/parse/PlaceholderParser.java
index 307fc3d4..6987a1ab 100644
--- 
a/ozhera-log/log-common/src/main/java/org/apache/ozhera/log/parse/PlaceholderParser.java
+++ 
b/ozhera-log/log-common/src/main/java/org/apache/ozhera/log/parse/PlaceholderParser.java
@@ -21,35 +21,58 @@ package org.apache.ozhera.log.parse;
 import java.util.*;
 
 /**
+ *
+ * 支持:
+ * 1. %s 普通字段
+ * 2. %s[n@sep] 合并 n 个 token
+ * 3. %s[-] 丢弃字段
+ * 4. 只在相邻两个 %s 之间的方括号可跳过,里面内容可提取
+ *
  * @author wtt
  * @version 1.0
  * @description
  * @date 2025/4/7 15:36
  */
 public class PlaceholderParser extends AbstractLogParser {
-    private final String parseScript;
     private final List<String> fieldNames;
-    private final List<String> staticParts;
+    private final List<ScriptPart> parts;
 
     public PlaceholderParser(LogParserData parserData) {
         super(parserData);
-        parseScript = parserData.getParseScript();
-        fieldNames = valueMap.entrySet().stream()
+        String parseScript = parserData.getParseScript();
+        this.fieldNames = valueMap.entrySet().stream()
                 .sorted(Map.Entry.comparingByValue())
                 .map(Map.Entry::getKey)
                 .toList();
-        this.staticParts = splitAndFormat();
+        this.parts = splitScript(parseScript);
     }
 
-    private List<String> splitAndFormat() {
-        List<String> parts = new ArrayList<>();
-        String[] tokens = parseScript.split("%s", -1);
-        for (String token : tokens) {
-            if (!token.isEmpty()) {
-                parts.add(token);
+    /** 拆分脚本为占位符和静态部分 */
+    private List<ScriptPart> splitScript(String script) {
+        List<ScriptPart> list = new ArrayList<>();
+        int idx = 0;
+        while (idx < script.length()) {
+            int next = script.indexOf("%s", idx);
+            if (next == -1) {
+                list.add(new ScriptPart(script.substring(idx), false, null));
+                break;
+            }
+            if (next > idx) {
+                list.add(new ScriptPart(script.substring(idx, next), false, 
null));
+            }
+            // 检查 modifier
+            String modifier = null;
+            if (next + 2 < script.length() && script.charAt(next + 2) == '[') {
+                int end = script.indexOf(']', next + 2);
+                if (end != -1) {
+                    modifier = script.substring(next + 3, end);
+                    next = end;
+                }
             }
+            list.add(new ScriptPart("%s", true, modifier));
+            idx = next + 2;
         }
-        return parts;
+        return list;
     }
 
     @Override
@@ -64,43 +87,7 @@ public class PlaceholderParser extends AbstractLogParser {
 
     @Override
     public List<String> parseLogData(String logData) throws Exception {
-        List<String> result = new ArrayList<>();
-        String remaining = logData;
-        int staticPartIndex = 0;
-
-        try {
-            if (!staticParts.isEmpty() && 
remaining.startsWith(staticParts.get(0))) {
-                remaining = remaining.substring(staticParts.get(0).length());
-                staticPartIndex++;
-            }
-
-            while (staticPartIndex < staticParts.size()) {
-                String nextStatic = staticParts.get(staticPartIndex);
-                int endPos = remaining.indexOf(nextStatic);
-
-                if (endPos == -1) {
-                    if (!remaining.trim().isEmpty()) {
-                        result.add(remaining.trim());
-                    }
-                    break;
-                }
-
-                String fieldValue = remaining.substring(0, endPos).trim();
-                if (!fieldValue.isEmpty()) {
-                    result.add(fieldValue);
-                }
-
-                remaining = remaining.substring(endPos + nextStatic.length());
-                staticPartIndex++;
-            }
-
-            if (!remaining.trim().isEmpty()) {
-                result.add(remaining.trim());
-            }
-
-        } catch (Exception ignored) {
-        }
-        return result;
+        return new 
ArrayList<>(parse(logData).values().stream().map(Object::toString).toList());
     }
 
 
@@ -108,36 +95,97 @@ public class PlaceholderParser extends AbstractLogParser {
         Map<String, Object> result = new LinkedHashMap<>();
         String remaining = logLine;
         int fieldIndex = 0;
-        int staticPartIndex = 0;
 
         try {
-            if (!staticParts.isEmpty() && 
remaining.startsWith(staticParts.get(0))) {
-                remaining = remaining.substring(staticParts.get(0).length());
-                staticPartIndex++;
-            }
+            for (int i = 0; i < parts.size(); i++) {
+                ScriptPart part = parts.get(i);
+                if (part.isPlaceholder) {
+                    // 检查下一个占位符之间是否有方括号
+                    String bracketValue = null;
+                    if (i + 1 < parts.size() && parts.get(i + 
1).isPlaceholder) {
+                        // 尝试提取 [] 内的内容
+                        if (!remaining.isEmpty() && remaining.charAt(0) == 
'[') {
+                            int endBracket = remaining.indexOf(']');
+                            if (endBracket != -1) {
+                                bracketValue = remaining.substring(1, 
endBracket);
+                                remaining = remaining.substring(endBracket + 
1);
+                            }
+                        }
+                    }
 
-            while (fieldIndex < fieldNames.size()) {
-                String nextStatic = staticPartIndex < staticParts.size() ? 
staticParts.get(staticPartIndex) : null;
-                int endPos = nextStatic != null ? 
remaining.indexOf(nextStatic) : remaining.length();
+                    String fieldValue;
+
+                    // n@sep 处理
+                    if (part.modifier != null && 
part.modifier.matches("\\d+@.*")) {
+                        String[] arr = part.modifier.split("@", 2);
+                        int count = Integer.parseInt(arr[0]);
+                        String sep = arr[1];
+                        List<String> tokens = new ArrayList<>();
+                        String temp = remaining;
+                        for (int j = 0; j < count; j++) {
+                            int idxSep = temp.indexOf(sep);
+                            if (idxSep == -1) {
+                                tokens.add(temp);
+                                temp = "";
+                                break;
+                            } else {
+                                tokens.add(temp.substring(0, idxSep));
+                                temp = temp.substring(idxSep + sep.length());
+                            }
+                        }
+                        fieldValue = String.join(sep, tokens).trim();
+                        remaining = temp;
+                    } else {
+                        // 普通 %s
+                        String nextStatic = null;
+                        for (int j = i + 1; j < parts.size(); j++) {
+                            if (!parts.get(j).isPlaceholder) {
+                                nextStatic = parts.get(j).text;
+                                break;
+                            }
+                        }
+                        int endPos = nextStatic != null ? 
remaining.indexOf(nextStatic) : remaining.length();
+                        if (endPos == -1) {
+                            endPos = remaining.length();
+                        }
+                        fieldValue = remaining.substring(0, endPos).trim();
+                        remaining = remaining.substring(endPos);
+                    }
 
-                if (endPos == -1) return result;
+                    // 如果有方括号值,可以组合或存入 Map
+                    if (bracketValue != null && !bracketValue.isEmpty()) {
+                        fieldValue = bracketValue;
+                    }
 
-                String fieldValue = remaining.substring(0, endPos).trim();
-                result.put(fieldNames.get(fieldIndex++), fieldValue);
-                remaining = remaining.substring(endPos);
+                    // 丢弃字段
+                    if (!"-".equals(part.modifier) && fieldIndex < 
fieldNames.size()) {
+                        result.put(fieldNames.get(fieldIndex++), fieldValue);
+                    }
 
-                if (nextStatic != null) {
-                    if (!remaining.startsWith(nextStatic)) {
+                } else {
+                    // 静态部分严格匹配
+                    if (!remaining.startsWith(part.text)) {
                         return Collections.emptyMap();
                     }
-                    remaining = remaining.substring(nextStatic.length());
-                    staticPartIndex++;
+                    remaining = remaining.substring(part.text.length());
                 }
             }
-
-            return result;
         } catch (Exception e) {
             return Collections.emptyMap();
         }
+
+        return result;
+    }
+
+    private static class ScriptPart {
+        String text;
+        boolean isPlaceholder;
+        String modifier;
+
+        public ScriptPart(String text, boolean isPlaceholder, String modifier) 
{
+            this.text = text;
+            this.isPlaceholder = isPlaceholder;
+            this.modifier = modifier;
+        }
     }
 }
diff --git 
a/ozhera-log/log-common/src/test/java/org/apache/ozhera/log/common/LogParserTest.java
 
b/ozhera-log/log-common/src/test/java/org/apache/ozhera/log/common/LogParserTest.java
index befc7e06..d419cf66 100644
--- 
a/ozhera-log/log-common/src/test/java/org/apache/ozhera/log/common/LogParserTest.java
+++ 
b/ozhera-log/log-common/src/test/java/org/apache/ozhera/log/common/LogParserTest.java
@@ -145,11 +145,11 @@ public class LogParserTest {
     @Test
     public void LogPlaceholderParserTest() throws Exception {
         Stopwatch stopwatch = Stopwatch.createStarted();
-        String keyList = 
"level:keyword,message:text,message_body:text,hostname:keyword,trace_id:keyword,thread:keyword,timestamp:date,time:date,linenumber:long,mqtag:keyword,tail:keyword,filename:keyword,logstore:keyword,logsource:keyword,logip:keyword,mqtopic:keyword";
-        String keyOrderList = 
"level:1,message:1,message_body:1,hostname:1,trace_id:1,thread:1,timestamp:1,time:1,linenumber:3,mqtag:3,tail:3,filename:3,logstore:3,logsource:3,logip:3,mqtopic:3";
-        String valueList = "2,5,-1,1,3,4,-1,0";
-        String parseScript = "[%s] [%s] [%s] [%s] [%s] %s %s";
-        String logData = "[2025-04-09T10:56:55.259+08:00] [kfs-test-123] 
[ERROR] [485bf6a9b5898ecdfd22696325b11b05] [DubboServerHandler-thread-495] 
c.x.k.w.s.i.DataDictServiceImpl - cache is not found. CacheLoader returned null 
for key DataDictServiceImpl$TenantKey@2f123a.";
+        String keyList = 
"timestamp:date,mqtopic:keyword,mqtag:keyword,logstore:keyword,logsource:keyword,message:text,tail:keyword,logip:keyword,linenumber:long,filename:keyword,level:keyword,time:keyword,trace_id:keyword,msg:keyword,tailId:integer,spaceId:integer,storeId:integer,deploySpace:keyword,traceId:keyword";
+        String keyOrderList = 
"timestamp:1,mqtopic:3,mqtag:3,logstore:3,logsource:3,message:1,tail:3,logip:3,linenumber:3,filename:3,level:1,time:1,trace_id:1,msg:1,tailId:3,spaceId:3,storeId:3,deploySpace:3,traceId:1";
+        String valueList = "-1,-1,0,1,2,3,-1";
+        String parseScript = "[%s %s[2@ ] %s[-]] -[trace_id=%s] %s - %s";
+        String logData = "[INFO 2025-09-03 09:47:05.233 http-nio-1115-exec-4] 
-[trace_id=47f8940234d777de50d17685171a6183] 
com.test.update.web.controller.v0.UpdatesController - request info 
d:sapphiren_id_global to test";
         Long collectStamp = Instant.now().toEpochMilli();
         Integer parserType = 
LogParserFactory.LogParserEnum.PLACEHOLDER_PARSE.getCode();
         LogParser customParse = LogParserFactory.getLogParser(parserType, 
keyList, valueList, parseScript, topicName, tailName, tag, logStoreName, 
keyOrderList);
diff --git 
a/ozhera-log/log-common/src/test/java/org/apache/ozhera/log/common/PathUtilsTest.java
 
b/ozhera-log/log-common/src/test/java/org/apache/ozhera/log/common/PathUtilsTest.java
index 022f04de..b2ad8170 100644
--- 
a/ozhera-log/log-common/src/test/java/org/apache/ozhera/log/common/PathUtilsTest.java
+++ 
b/ozhera-log/log-common/src/test/java/org/apache/ozhera/log/common/PathUtilsTest.java
@@ -47,13 +47,13 @@ public class PathUtilsTest {
 
     @Test
     public void test00() {
-        String logPattern = 
"/home/work/logs/neo-logs/(a|b)/applogs/mi_com_event/mi_com_event.log";
+        String logPattern = "/home/work/log/log-test/info-*.log";
         List<String> watches = PathUtils.parseWatchDirectory(logPattern);
         System.out.println(new Gson().toJson(watches));
 
-        Assert.assertEquals(2, watches.size());
-        Assert.assertEquals("/home/work/logs/neo-logs/a/applogs", 
watches.get(0));
-        Assert.assertEquals("/home/work/logs/neo-logs/b/applogs", 
watches.get(1));
+        Assert.assertEquals(1, watches.size());
+        Assert.assertEquals("/home/work/log/log-test/info-1.log", logPattern);
+        Assert.assertEquals("/home/work/logs/neo-logs/b/applogs", logPattern);
     }
 
     @Test
@@ -313,10 +313,10 @@ public class PathUtilsTest {
 
     @Test
     public void testLogPattern1() {
-        String logSplitExpress = 
"/home/work/logs/neo-logs/(eventapi-stable-66fd975598-z8fd9|eventapi-stable-66fd975598-zfhwq|eventapi-ams-runscript-stable-5c76f54c68-lrmcl|eventapi-stable-66fd975598-l7c4s|eventapi-stable-66fd975598-ckwwd)/applogs/mi_com_event.log-.*";
+        String logSplitExpress = "/home/work/log/log-test/info-.*.log";
         Pattern pattern = Pattern.compile(logSplitExpress);
-        Assert.assertEquals(true, 
pattern.matcher("/home/work/logs/neo-logs/eventapi-stable-66fd975598-z8fd9/applogs/mi_com_event.log-2022-01-11-16").matches());
-        Assert.assertEquals(true, 
pattern.matcher("/home/work/logs/neo-logs/eventapi-ams-runscript-stable-5c76f54c68-lrmcl/applogs/mi_com_event.log-2022-01-11-16").matches());
+        Assert.assertEquals(true, 
pattern.matcher("/home/work/log/log-test/info-1.log").matches());
+        Assert.assertEquals(false, 
pattern.matcher("/home/work/logs/neo-logs/info-2.log").matches());
     }
 
     @Test
diff --git 
a/ozhera-log/log-manager/src/main/java/org/apache/ozhera/log/manager/domain/Tpc.java
 
b/ozhera-log/log-manager/src/main/java/org/apache/ozhera/log/manager/domain/Tpc.java
index 39657110..52a61d9d 100644
--- 
a/ozhera-log/log-manager/src/main/java/org/apache/ozhera/log/manager/domain/Tpc.java
+++ 
b/ozhera-log/log-manager/src/main/java/org/apache/ozhera/log/manager/domain/Tpc.java
@@ -19,12 +19,6 @@
 package org.apache.ozhera.log.manager.domain;
 
 import com.google.common.collect.Lists;
-import org.apache.ozhera.log.manager.common.context.MoneUserContext;
-import org.apache.ozhera.log.manager.common.exception.MilogManageException;
-import org.apache.ozhera.log.manager.dao.MilogSpaceDao;
-import org.apache.ozhera.log.manager.model.MilogSpaceParam;
-import org.apache.ozhera.log.manager.model.pojo.MilogSpaceDO;
-import org.apache.ozhera.log.manager.user.MoneUser;
 import com.xiaomi.mone.tpc.api.service.NodeFacade;
 import com.xiaomi.mone.tpc.api.service.NodeUserFacade;
 import com.xiaomi.mone.tpc.api.service.UserOrgFacade;
@@ -33,6 +27,7 @@ import com.xiaomi.mone.tpc.common.enums.NodeTypeEnum;
 import com.xiaomi.mone.tpc.common.enums.OutIdTypeEnum;
 import com.xiaomi.mone.tpc.common.enums.UserTypeEnum;
 import com.xiaomi.mone.tpc.common.param.*;
+import com.xiaomi.mone.tpc.common.vo.NodeUserRelVo;
 import com.xiaomi.mone.tpc.common.vo.NodeVo;
 import com.xiaomi.mone.tpc.common.vo.OrgInfoVo;
 import com.xiaomi.mone.tpc.common.vo.PageDataVo;
@@ -43,12 +38,19 @@ import com.xiaomi.youpin.docean.plugin.dubbo.anno.Reference;
 import com.xiaomi.youpin.infra.rpc.Result;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.collections.CollectionUtils;
+import org.apache.ozhera.log.manager.common.context.MoneUserContext;
+import org.apache.ozhera.log.manager.common.exception.MilogManageException;
+import org.apache.ozhera.log.manager.dao.MilogSpaceDao;
+import org.apache.ozhera.log.manager.model.MilogSpaceParam;
+import org.apache.ozhera.log.manager.model.pojo.MilogSpaceDO;
+import org.apache.ozhera.log.manager.user.MoneUser;
 
 import javax.annotation.Resource;
 import java.util.List;
 import java.util.Optional;
 import java.util.stream.Collectors;
 
+import static org.apache.ozhera.log.common.Constant.DEFAULT_OPERATOR;
 import static org.apache.ozhera.log.common.Constant.GSON;
 
 @Service
@@ -251,27 +253,21 @@ public class Tpc {
         tpcUserService.add(add);
     }
 
-    public NodeVo getByOuterId(Long id, Integer outType) {
+    public NodeVo getByOuterId(Long id, String account) {
         MoneUser currentUser = MoneUserContext.getCurrentUser();
         NodeQryParam param = new NodeQryParam();
-        param.setPager(false);
-        param.setAccount(currentUser == null ? "system" : 
currentUser.getUser());
+        param.setAccount(StringUtils.isNotBlank(account) ? account : 
currentUser.getUser());
         param.setUserType(UserTypeEnum.CAS_TYPE.getCode());
-        param.setType(NodeTypeEnum.PRO_SUB_GROUP.getCode());
-        if (null != currentUser) {
-            param.setUserType(currentUser.getUserType());
-        }
         param.setStatus(NodeStatusEnum.ENABLE.getCode());
         param.setOutIdType(OutIdTypeEnum.SPACE.getCode());
         param.setOutId(id);
         param.setMyNode(false);
         Result<NodeVo> nodeVoResult = tpcService.getByOutId(param);
-        NodeVo data = nodeVoResult.getData();
-        return data;
+        return nodeVoResult.getData();
     }
 
     public NodeVo getSpaceByOuterId(Long id) {
-        return getByOuterId(id, OutIdTypeEnum.SPACE.getCode());
+        return getByOuterId(id, "");
     }
 
     public String getSpaceLastOrg(Long id) {
@@ -295,4 +291,19 @@ public class Tpc {
         return res.getData();
     }
 
+    public List<NodeUserRelVo> querySpaceMember(Long spaceId) {
+        NodeVo nodeVo = getByOuterId(spaceId, DEFAULT_OPERATOR);
+        if (null != nodeVo) {
+            NodeUserQryParam param = new NodeUserQryParam();
+            param.setNodeId(nodeVo.getId());
+            param.setAccount(DEFAULT_OPERATOR);
+            param.setUserType(UserTypeEnum.CAS_TYPE.getCode());
+            Result<PageDataVo<NodeUserRelVo>> nodeUserResult = 
tpcUserService.list(param);
+            PageDataVo<NodeUserRelVo> nodeUserResultData = 
nodeUserResult.getData();
+            return nodeUserResultData.getList();
+        }
+        return Lists.newArrayList();
+    }
+
+
 }
diff --git 
a/ozhera-log/log-manager/src/main/java/org/apache/ozhera/log/manager/model/pojo/MilogLogTailDo.java
 
b/ozhera-log/log-manager/src/main/java/org/apache/ozhera/log/manager/model/pojo/MilogLogTailDo.java
index c3824a54..08ed3bc2 100644
--- 
a/ozhera-log/log-manager/src/main/java/org/apache/ozhera/log/manager/model/pojo/MilogLogTailDo.java
+++ 
b/ozhera-log/log-manager/src/main/java/org/apache/ozhera/log/manager/model/pojo/MilogLogTailDo.java
@@ -19,11 +19,11 @@
 package org.apache.ozhera.log.manager.model.pojo;
 
 import com.alibaba.fastjson.annotation.JSONField;
+import lombok.Data;
+import lombok.EqualsAndHashCode;
 import org.apache.ozhera.log.api.model.meta.FilterDefine;
 import org.apache.ozhera.log.manager.model.BaseCommon;
 import org.apache.ozhera.log.manager.model.dto.MotorRoomDTO;
-import lombok.Data;
-import lombok.EqualsAndHashCode;
 import org.nutz.dao.entity.annotation.*;
 import org.nutz.json.JsonField;
 
@@ -155,7 +155,7 @@ public class MilogLogTailDo extends BaseCommon {
     @Column(value = "collection_ready")
     @ColDefine(type = ColType.VARCHAR, width = 1024)
     @Comment("start to ready coll")
-    private Boolean collectionReady;
+    private Boolean collectionReady = true;
 
     @Column(value = "origin_system")
     @ColDefine(type = ColType.VARCHAR, width = 1024)
diff --git 
a/ozhera-log/log-stream/src/main/java/org/apache/ozhera/log/stream/config/ConfigManager.java
 
b/ozhera-log/log-stream/src/main/java/org/apache/ozhera/log/stream/config/ConfigManager.java
index 58c136fd..833b3961 100644
--- 
a/ozhera-log/log-stream/src/main/java/org/apache/ozhera/log/stream/config/ConfigManager.java
+++ 
b/ozhera-log/log-stream/src/main/java/org/apache/ozhera/log/stream/config/ConfigManager.java
@@ -59,6 +59,9 @@ public class ConfigManager {
     @Value("$hera.stream.monitor_space_data_id")
     private String spaceDataId;
 
+    @Value("$hera.stream.type")
+    private String streamType;
+
     //final String spaceDataId = LOG_MANAGE_PREFIX + NAMESPACE_CONFIG_DATA_ID;
 
     /**
@@ -177,11 +180,25 @@ public class ConfigManager {
             log.warn("listen dataID:{},groupId:{},but receive config is 
empty,uniqueMarks:{}", spaceDataId, DEFAULT_GROUP_ID, uniqueMark);
             return;
         }
-        Map<Long, String> dataIdMap = 
extensionInstance.getConfigMapByUniqueMark(config, uniqueMark);
-        log.info("uniqueMark:{},data key:{}", uniqueMark, 
objectMapper.writeValueAsString(dataIdMap.keySet()));
+        if ("all".equals(streamType)) {
+            for (Map.Entry<String, Map<Long, String>> entry : 
config.entrySet()) {
+                Map<Long, String> dataIdMap = entry.getValue();
+                log.info("uniqueMark:{},data key:{}", uniqueMark, 
objectMapper.writeValueAsString(dataIdMap.keySet()));
+                processDataIdMapWithLock(dataIdMap);
+            }
+        } else {
+            Map<Long, String> dataIdMap = 
extensionInstance.getConfigMapByUniqueMark(config, uniqueMark);
+            log.info("uniqueMark:{},data key:{}", uniqueMark, 
objectMapper.writeValueAsString(dataIdMap.keySet()));
+            processDataIdMapWithLock(dataIdMap);
+        }
+    }
+
+    private void processDataIdMapWithLock(Map<Long, String> dataIdMap) throws 
Exception {
         if (spaceLock.tryLock()) {
             try {
-                stopUnusefulListenerAndJob(dataIdMap);
+                if (!"all".equals(streamType)) {
+                    stopUnusefulListenerAndJob(dataIdMap);
+                }
                 startNewListenerAndJob(dataIdMap);
             } finally {
                 spaceLock.unlock();
@@ -196,16 +213,6 @@ public class ConfigManager {
         return Ioc.ins().getBean(factualServiceName);
     }
 
-    /**
-     * The new {spaceid,dataid} A is compared to {spaceid,dataid} B in memory 
to filter out the sets A-B
-     *
-     * @param spaceId
-     * @return
-     */
-    public boolean existListener(Long spaceId) {
-        return milogSpaceDataMap.containsKey(spaceId);
-    }
-
     /**
      * The new {spaceId,dataId} A is compared to {spaceId,dataId} B in memory 
and filtered out the set B-A
      *
@@ -287,7 +294,7 @@ public class ConfigManager {
         milogStreamDataMap.forEach((spaceId, dataId) -> {
             // there is no listener corresponding to the spaceId in memory
             try {
-                if (!existListener(spaceId)) {
+                if (!milogSpaceDataMap.containsKey(spaceId)) {
                     log.info("startNewListenerAndJob for spaceId start:{}", 
spaceId);
                     // Get a copy of the spaceData configuration through the 
dataID and put it in the configListener cache
                     MilogSpaceData milogSpaceData = getMilogSpaceData(dataId);
diff --git 
a/ozhera-log/log-stream/src/main/java/org/apache/ozhera/log/stream/job/JobManager.java
 
b/ozhera-log/log-stream/src/main/java/org/apache/ozhera/log/stream/job/JobManager.java
index c650c267..a7357809 100644
--- 
a/ozhera-log/log-stream/src/main/java/org/apache/ozhera/log/stream/job/JobManager.java
+++ 
b/ozhera-log/log-stream/src/main/java/org/apache/ozhera/log/stream/job/JobManager.java
@@ -20,6 +20,7 @@ package org.apache.ozhera.log.stream.job;
 
 import com.fasterxml.jackson.core.JsonProcessingException;
 import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.SerializationFeature;
 import com.xiaomi.youpin.docean.Ioc;
 import lombok.Data;
 import lombok.extern.slf4j.Slf4j;
@@ -62,6 +63,7 @@ public class JobManager {
         sinkJobType = Config.ins().get(SINK_JOB_TYPE_KEY, "");
         sinkChain = Ioc.ins().getBean(SinkChain.class);
         jobs = new ConcurrentHashMap<>();
+        objectMapper.configure(SerializationFeature.FAIL_ON_EMPTY_BEANS, 
false);
     }
 
     public void closeJobs(MilogSpaceData milogSpaceData) throws 
JsonProcessingException {
diff --git 
a/ozhera-log/log-stream/src/main/java/org/apache/ozhera/log/stream/job/LogDataTransfer.java
 
b/ozhera-log/log-stream/src/main/java/org/apache/ozhera/log/stream/job/LogDataTransfer.java
index c312e965..bb31de93 100644
--- 
a/ozhera-log/log-stream/src/main/java/org/apache/ozhera/log/stream/job/LogDataTransfer.java
+++ 
b/ozhera-log/log-stream/src/main/java/org/apache/ozhera/log/stream/job/LogDataTransfer.java
@@ -140,6 +140,12 @@ public class LogDataTransfer {
         return objectMapper.readValue(msg, LineMessage.class);
     }
 
+    public static void main(String[] args) throws JsonProcessingException {
+        String message = 
"{\"extMap\":{\"ct\":\"1756780824719\",\"ip\":\"10.7.84.220\",\"tag\":\"tags_392_120935_132193\",\"type\":\"1\"},\"fileName\":\"/home/work/log/nr-promotion-promotion-admin-global-1080348-c-67bbfb479c-27t45/promotion-admin-global/server.log\",\"lineNumber\":2642,\"msgBody\":\"2025-09-02
 10:40:21,610|INFO 
|4a1378575e03c5d38fa9fd0f7351227a|call_client213|c.x.n.p.a.g.i.u.filter.DubboCommonFilter|dubbo
 invoke. service:com.xiaomi.nr.promotion.admin.global.impl.DubboHea [...]
+        LineMessage lineMessage = new ObjectMapper().readValue(message, 
LineMessage.class);
+        System.out.println(lineMessage.getTimestamp());
+    }
+
     private void putCommonData(Map<String, Object> dataMap) {
         dataMap.putIfAbsent(LOG_STREAM_SPACE_ID, 
sinkJobConfig.getLogSpaceId());
         dataMap.putIfAbsent(LOG_STREAM_STORE_ID, 
sinkJobConfig.getLogStoreId());
diff --git 
a/ozhera-log/log-stream/src/main/java/org/apache/ozhera/log/stream/plugin/es/EsPlugin.java
 
b/ozhera-log/log-stream/src/main/java/org/apache/ozhera/log/stream/plugin/es/EsPlugin.java
index f33fc498..cad12b8b 100644
--- 
a/ozhera-log/log-stream/src/main/java/org/apache/ozhera/log/stream/plugin/es/EsPlugin.java
+++ 
b/ozhera-log/log-stream/src/main/java/org/apache/ozhera/log/stream/plugin/es/EsPlugin.java
@@ -150,11 +150,15 @@ public class EsPlugin {
         }
         MqMessageDTO MqMessageDTO = new MqMessageDTO();
         MqMessageDTO.setEsInfo(esInfo);
+        AtomicInteger count = new AtomicInteger();
         List<MqMessageDTO.CompensateMqDTO> compensateMqDTOS = 
Lists.newArrayList();
         request.requests().stream().filter(x -> x instanceof IndexRequest)
                 .forEach(x -> {
+                    int currentNum = count.incrementAndGet();
                     Map source = ((IndexRequest) x).sourceAsMap();
-                    log.error("Failure to handle index:[{}], type:[{}],id:[{}] 
data:[{}]", x.index(), x.type(), x.id(), JSON.toJSONString(source));
+                    if (currentNum == 1 || currentNum % 600 == 0) {
+                        log.error("Failure to handle index:[{}], 
type:[{}],id:[{}] data:[{}]", x.index(), x.type(), x.id(), 
JSON.toJSONString(source));
+                    }
                     MqMessageDTO.CompensateMqDTO compensateMqDTO = new 
MqMessageDTO.CompensateMqDTO();
                     compensateMqDTO.setMsg(JSON.toJSONString(source));
                     compensateMqDTO.setEsIndex(x.index());
diff --git 
a/ozhera-log/log-stream/src/main/java/org/apache/ozhera/log/stream/plugin/nacos/LevelFilterConfigListener.java
 
b/ozhera-log/log-stream/src/main/java/org/apache/ozhera/log/stream/plugin/nacos/LevelFilterConfigListener.java
index 1d376cf2..71b6b357 100644
--- 
a/ozhera-log/log-stream/src/main/java/org/apache/ozhera/log/stream/plugin/nacos/LevelFilterConfigListener.java
+++ 
b/ozhera-log/log-stream/src/main/java/org/apache/ozhera/log/stream/plugin/nacos/LevelFilterConfigListener.java
@@ -58,7 +58,7 @@ public class LevelFilterConfigListener {
         ScheduledExecutorService scheduledExecutor = Executors
                 
.newSingleThreadScheduledExecutor(ThreadUtil.newNamedThreadFactory("log-level-filter",
 false));
         scheduledExecutor.scheduleAtFixedRate(() ->
-                SafeRun.run(() -> configChangeOperate()), 0, 1, 
TimeUnit.MINUTES);
+                SafeRun.run(this::configChangeOperate), 0, 1, 
TimeUnit.MINUTES);
     }
 
     private void configChangeOperate() {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to