This is an automated email from the ASF dual-hosted git repository.

dingtao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozhera.git


The following commit(s) were added to refs/heads/master by this push:
     new 333d6244 fix: fix log statistics and pipeline calling logic (#619)
333d6244 is described below

commit 333d6244741be6d33e42fbf4bb8222b8896435ac
Author: wtt <[email protected]>
AuthorDate: Thu Nov 27 10:56:33 2025 +0800

    fix: fix log statistics and pipeline calling logic (#619)
    
    * fix: solve the blocking problem caused by serverless startup
    
    * fix: fix log statistics and pipeline calling logic
---
 .../log/agent/channel/ChannelServiceImpl.java      | 13 ++++---
 .../agent/channel/WildcardChannelServiceImpl.java  | 43 +++++++++++-----------
 .../log/agent/channel/pipeline/Pipeline.java       |  4 +-
 3 files changed, 30 insertions(+), 30 deletions(-)

diff --git 
a/ozhera-log/log-agent/src/main/java/org/apache/ozhera/log/agent/channel/ChannelServiceImpl.java
 
b/ozhera-log/log-agent/src/main/java/org/apache/ozhera/log/agent/channel/ChannelServiceImpl.java
index 6611c0eb..73ce0461 100644
--- 
a/ozhera-log/log-agent/src/main/java/org/apache/ozhera/log/agent/channel/ChannelServiceImpl.java
+++ 
b/ozhera-log/log-agent/src/main/java/org/apache/ozhera/log/agent/channel/ChannelServiceImpl.java
@@ -51,6 +51,7 @@ import org.apache.ozhera.log.utils.NetUtil;
 import java.time.Instant;
 import java.util.*;
 import java.util.concurrent.*;
+import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.concurrent.locks.ReentrantLock;
 import java.util.function.Predicate;
@@ -111,7 +112,7 @@ public class ChannelServiceImpl extends 
AbstractChannelService {
 
     private volatile long lastSendTime = System.currentTimeMillis();
 
-    private volatile long logCounts = 0;
+    private final AtomicLong logCounts = new AtomicLong();
 
     private ScheduledFuture<?> scheduledFuture;
 
@@ -134,7 +135,7 @@ public class ChannelServiceImpl extends 
AbstractChannelService {
     private String linePrefix;
 
 
-    private Pipeline pipeline;
+    private final Pipeline pipeline;
 
     public ChannelServiceImpl(MsgExporter msgExporter, AgentMemoryService 
memoryService,
                               ChannelDefine channelDefine, FilterChain chain, 
Pipeline pipeline) {
@@ -478,7 +479,7 @@ public class ChannelServiceImpl extends 
AbstractChannelService {
 
     private void wrapDataToSend(String lineMsg, AtomicReference<ReadResult> 
readResult, String pattern, String patternCode, long ct) {
         RequestContext requestContext = 
RequestContext.builder().channelDefine(channelDefine).readResult(readResult.get()).lineMsg(lineMsg).build();
-        if (pipeline.invoke(requestContext)) {
+        if (!pipeline.invoke(requestContext)) {
             return;
         }
         LineMessage lineMessage = createLineMessage(lineMsg, readResult, 
pattern, patternCode, ct);
@@ -704,11 +705,11 @@ public class ChannelServiceImpl extends 
AbstractChannelService {
 
             long current = System.currentTimeMillis();
             msgExporter.export(subList);
-            logCounts += subList.size();
+            logCounts.addAndGet(subList.size());
             lastSendTime = System.currentTimeMillis();
             channelMemory.setCurrentTime(lastSendTime);
 
-            log.info("doExport channelId:{}, send {} message, cost:{}, total 
send:{}, instanceId:{},", channelDefine.getChannelId(), subList.size(), 
lastSendTime - current, logCounts, instanceId());
+            log.info("doExport channelId:{}, send {} message, cost:{}, total 
send:{}, instanceId:{},", channelDefine.getChannelId(), subList.size(), 
lastSendTime - current, logCounts.get(), instanceId());
         } catch (Exception e) {
             log.error("doExport Exception:{}", e);
         } finally {
@@ -969,7 +970,7 @@ public class ChannelServiceImpl extends 
AbstractChannelService {
 
     @Override
     public Long getLogCounts() {
-        return this.logCounts;
+        return this.logCounts.get();
     }
 
 
diff --git 
a/ozhera-log/log-agent/src/main/java/org/apache/ozhera/log/agent/channel/WildcardChannelServiceImpl.java
 
b/ozhera-log/log-agent/src/main/java/org/apache/ozhera/log/agent/channel/WildcardChannelServiceImpl.java
index ef62b742..f4c17222 100644
--- 
a/ozhera-log/log-agent/src/main/java/org/apache/ozhera/log/agent/channel/WildcardChannelServiceImpl.java
+++ 
b/ozhera-log/log-agent/src/main/java/org/apache/ozhera/log/agent/channel/WildcardChannelServiceImpl.java
@@ -52,6 +52,7 @@ import java.io.IOException;
 import java.time.Instant;
 import java.util.*;
 import java.util.concurrent.*;
+import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.concurrent.locks.ReentrantLock;
 import java.util.regex.Pattern;
@@ -71,7 +72,7 @@ import static org.apache.ozhera.log.common.PathUtils.*;
 @Slf4j
 public class WildcardChannelServiceImpl extends AbstractChannelService {
 
-    private AgentMemoryService memoryService;
+    private final AgentMemoryService memoryService;
 
     private MsgExporter msgExporter;
 
@@ -79,41 +80,41 @@ public class WildcardChannelServiceImpl extends 
AbstractChannelService {
 
     private ChannelMemory channelMemory;
 
-    private FilterChain chain;
+    private final FilterChain chain;
 
     private String logPattern;
 
     private String linePrefix;
 
-    private String memoryBasePath;
+    private final String memoryBasePath;
 
     private static final String POINTER_FILENAME_PREFIX = ".ozhera_pointer";
 
-    private List<LineMessage> lineMessageList = new ArrayList<>();
+    private final List<LineMessage> lineMessageList = new ArrayList<>();
 
     private ScheduledFuture<?> scheduledFuture;
 
     private ScheduledFuture<?> lastFileLineScheduledFuture;
 
-    private List<Future<?>> fileCollFutures = Lists.newArrayList();
+    private final List<Future<?>> fileCollFutures = Lists.newArrayList();
 
     private volatile long lastSendTime = System.currentTimeMillis();
 
-    private volatile long logCounts = 0;
+    private final AtomicLong logCounts = new AtomicLong();
 
-    private ReentrantLock reentrantLock = new ReentrantLock();
+    private final ReentrantLock reentrantLock = new ReentrantLock();
 
     private DefaultMonitorListener defaultMonitorListener;
 
     private HeraFileMonitor fileMonitor;
-    
+
     /**
      * Track file truncation check time to avoid duplicate handling
      */
     private final Map<String, Long> fileTruncationCheckMap = new 
ConcurrentHashMap<>();
 
 
-    private Pipeline pipeline;
+    private final Pipeline pipeline;
 
     public WildcardChannelServiceImpl(MsgExporter msgExporter, 
AgentMemoryService memoryService,
                                       ChannelDefine channelDefine, FilterChain 
chain,
@@ -217,7 +218,7 @@ public class WildcardChannelServiceImpl extends 
AbstractChannelService {
                 FileInfoCache.ins().remove(cacheKey);
                 continue;
             }
-            
+
             // Check for file truncation (handles copytruncate log rotation)
             checkFileTruncation(filePath);
         }
@@ -236,29 +237,29 @@ public class WildcardChannelServiceImpl extends 
AbstractChannelService {
                     savedPointer = fileProgress.getPointer();
                 }
             }
-            
+
             if (savedPointer <= 0) {
                 return;
             }
-            
+
             File file = new File(filePath);
             if (!file.exists() || savedPointer <= file.length()) {
                 return;
             }
-            
+
             long lastCheckTime = fileTruncationCheckMap.getOrDefault(filePath, 
0L);
             long currentTime = System.currentTimeMillis();
             long currentFileLength = file.length();
             boolean isImmediateTruncation = currentFileLength == 0 || 
currentFileLength < savedPointer * 0.1;
-            
+
             if (isImmediateTruncation || (lastCheckTime > 0 && (currentTime - 
lastCheckTime) > 15000)) {
                 log.warn("File truncation detected: pointer {} > length {} for 
file:{}, restarting read",
                         savedPointer, currentFileLength, filePath);
-                
+
                 // Shutdown and restart ReadListener for this file
                 for (ReadListener readListener : 
defaultMonitorListener.getReadListenerList()) {
                     if (readListener instanceof 
com.xiaomi.mone.file.listener.OzHeraReadListener) {
-                        com.xiaomi.mone.file.LogFile2 logFile = 
+                        com.xiaomi.mone.file.LogFile2 logFile =
                                 
((com.xiaomi.mone.file.listener.OzHeraReadListener) readListener).getLogFile();
                         if (logFile != null && 
filePath.equals(logFile.getFile())) {
                             logFile.shutdown();
@@ -273,7 +274,7 @@ public class WildcardChannelServiceImpl extends 
AbstractChannelService {
             log.debug("Error checking file truncation for path:{}", filePath, 
e);
         }
     }
-    
+
     // Check for inode changes: when a file with the same path is deleted and 
recreated with a new inode
     // DefaultMonitorListener handles file deletion, but may not detect inode 
changes for files with the same path
     private void checkAndShutdownInodeChangedReadListeners() {
@@ -445,7 +446,7 @@ public class WildcardChannelServiceImpl extends 
AbstractChannelService {
 
     private void wrapDataToSend(String lineMsg, AtomicReference<ReadResult> 
readResult, String patternCode, long ct) {
         RequestContext requestContext = 
RequestContext.builder().channelDefine(channelDefine).readResult(readResult.get()).lineMsg(lineMsg).build();
-        if (pipeline.invoke(requestContext)) {
+        if (!pipeline.invoke(requestContext)) {
             return;
         }
         String filePathName = readResult.get().getFilePathName();
@@ -472,11 +473,11 @@ public class WildcardChannelServiceImpl extends 
AbstractChannelService {
 
             long current = System.currentTimeMillis();
             msgExporter.export(subList);
-            logCounts += subList.size();
+            logCounts.addAndGet(subList.size());
             lastSendTime = System.currentTimeMillis();
             channelMemory.setCurrentTime(lastSendTime);
 
-            log.info("doExport channelId:{}, send {} message, cost:{}, total 
send:{}, instanceId:{},", channelDefine.getChannelId(), subList.size(), 
lastSendTime - current, logCounts, instanceId());
+            log.info("doExport channelId:{}, send {} message, cost:{}, total 
send:{}, instanceId:{},", channelDefine.getChannelId(), subList.size(), 
lastSendTime - current, logCounts.get(), instanceId());
         } catch (Exception e) {
             log.error("doExport Exception", e);
         } finally {
@@ -522,7 +523,7 @@ public class WildcardChannelServiceImpl extends 
AbstractChannelService {
 
     @Override
     public Long getLogCounts() {
-        return logCounts;
+        return logCounts.get();
     }
 
     @Override
diff --git 
a/ozhera-log/log-agent/src/main/java/org/apache/ozhera/log/agent/channel/pipeline/Pipeline.java
 
b/ozhera-log/log-agent/src/main/java/org/apache/ozhera/log/agent/channel/pipeline/Pipeline.java
index 511496a5..580c852f 100644
--- 
a/ozhera-log/log-agent/src/main/java/org/apache/ozhera/log/agent/channel/pipeline/Pipeline.java
+++ 
b/ozhera-log/log-agent/src/main/java/org/apache/ozhera/log/agent/channel/pipeline/Pipeline.java
@@ -25,8 +25,6 @@ import java.util.Comparator;
 import java.util.List;
 import java.util.ServiceLoader;
 
-import static org.apache.ozhera.log.common.Constant.GSON;
-
 /**
  * @author wtt
  * @date 2025/11/21 15:07
@@ -42,7 +40,7 @@ public class Pipeline {
             valves.add(valve);
         }
         valves.sort(Comparator.naturalOrder());
-        log.info("Pipeline valves: {}", GSON.toJson(valves));
+        log.info("pipeline valves: {}", valves);
     }
 
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to