This is an automated email from the ASF dual-hosted git repository.

zhangxiaowei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozhera.git


The following commit(s) were added to refs/heads/master by this push:
     new aaa5c53e fix: optimize file reading exception handling (#623)
aaa5c53e is described below

commit aaa5c53ee7e9d41efaf1a8fa74e803ac132c4858
Author: wtt <[email protected]>
AuthorDate: Tue Dec 16 10:52:49 2025 +0800

    fix: optimize file reading exception handling (#623)
    
    * fix: solve the blocking problem caused by serverless startup
    
    * fix: fix pipeline execution logic error
    
    * fix: optimize file reading exception handling
---
 .../porcessor/AgentCollectProgressProcessor.java   | 41 +++++++++++----
 .../log/agent/channel/AbstractChannelService.java  |  6 ++-
 .../log/agent/channel/ChannelServiceImpl.java      | 60 +++++++++++-----------
 .../agent/channel/WildcardChannelServiceImpl.java  |  8 +--
 4 files changed, 70 insertions(+), 45 deletions(-)

diff --git 
a/ozhera-log/log-agent-server/src/main/java/org/apache/ozhera/log/server/porcessor/AgentCollectProgressProcessor.java
 
b/ozhera-log/log-agent-server/src/main/java/org/apache/ozhera/log/server/porcessor/AgentCollectProgressProcessor.java
index af9339e0..a1e33526 100644
--- 
a/ozhera-log/log-agent-server/src/main/java/org/apache/ozhera/log/server/porcessor/AgentCollectProgressProcessor.java
+++ 
b/ozhera-log/log-agent-server/src/main/java/org/apache/ozhera/log/server/porcessor/AgentCollectProgressProcessor.java
@@ -20,16 +20,19 @@ package org.apache.ozhera.log.server.porcessor;
 
 import com.xiaomi.data.push.rpc.netty.NettyRequestProcessor;
 import com.xiaomi.data.push.rpc.protocol.RemotingCommand;
-import org.apache.ozhera.log.api.model.vo.UpdateLogProcessCmd;
-import org.apache.ozhera.log.common.Constant;
-import org.apache.ozhera.log.server.common.Version;
-import org.apache.ozhera.log.server.service.DefaultLogProcessCollector;
 import com.xiaomi.youpin.docean.Ioc;
 import com.xiaomi.youpin.docean.anno.Component;
 import io.netty.channel.ChannelHandlerContext;
 import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.ozhera.log.api.model.vo.UpdateLogProcessCmd;
+import org.apache.ozhera.log.common.Constant;
+import org.apache.ozhera.log.server.common.Version;
+import org.apache.ozhera.log.server.service.DefaultLogProcessCollector;
 
 import javax.annotation.Resource;
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
 import java.nio.charset.StandardCharsets;
 
 import static org.apache.ozhera.log.common.Constant.GSON;
@@ -54,13 +57,17 @@ public class AgentCollectProgressProcessor implements 
NettyRequestProcessor {
         log.debug("received a message from the agent");
         RemotingCommand response = 
RemotingCommand.createResponseCommand(Constant.RPCCMD_AGENT_CODE);
         String body = new String(request.getBody(), StandardCharsets.UTF_8);
-        UpdateLogProcessCmd cmd = GSON.fromJson(body, 
UpdateLogProcessCmd.class);
-        log.debug("a request from the client sent by the agent:{}", 
cmd.getIp());
-        if (null == processService && 
Ioc.ins().containsBean(DefaultLogProcessCollector.class.getCanonicalName())) {
-            processService = 
Ioc.ins().getBean(DefaultLogProcessCollector.class);
-        }
-        if (null != processService) {
-            processService.collectLogProcess(cmd);
+        try {
+            UpdateLogProcessCmd cmd = GSON.fromJson(body, 
UpdateLogProcessCmd.class);
+            log.debug("a request from the client sent by the agent:{}", 
cmd.getIp());
+            if (null == processService && 
Ioc.ins().containsBean(DefaultLogProcessCollector.class.getCanonicalName())) {
+                processService = 
Ioc.ins().getBean(DefaultLogProcessCollector.class);
+            }
+            if (null != processService) {
+                processService.collectLogProcess(cmd);
+            }
+        } catch (Exception e) {
+            log.error("processRequest error,ip:{},body:{}", getIp(ctx), body, 
e);
         }
         response.setBody(version.toString().getBytes());
         response.setBody(Constant.SUCCESS_MESSAGE.getBytes());
@@ -71,4 +78,16 @@ public class AgentCollectProgressProcessor implements 
NettyRequestProcessor {
     public boolean rejectRequest() {
         return false;
     }
+
+
+    private String getIp(ChannelHandlerContext ctx) {
+        SocketAddress sa = ctx.channel().remoteAddress();
+        if (sa instanceof InetSocketAddress) {
+            InetSocketAddress isa = (InetSocketAddress) sa;
+            String ip = isa.getAddress().getHostAddress();
+            int port = isa.getPort();
+            return String.format("%s:%d", ip, port);
+        }
+        return StringUtils.EMPTY;
+    }
 }
diff --git 
a/ozhera-log/log-agent/src/main/java/org/apache/ozhera/log/agent/channel/AbstractChannelService.java
 
b/ozhera-log/log-agent/src/main/java/org/apache/ozhera/log/agent/channel/AbstractChannelService.java
index 275df882..a34f7cdb 100644
--- 
a/ozhera-log/log-agent/src/main/java/org/apache/ozhera/log/agent/channel/AbstractChannelService.java
+++ 
b/ozhera-log/log-agent/src/main/java/org/apache/ozhera/log/agent/channel/AbstractChannelService.java
@@ -18,7 +18,6 @@
  */
 package org.apache.ozhera.log.agent.channel;
 
-import cn.hutool.core.io.FileUtil;
 import com.xiaomi.mone.file.ReadResult;
 import com.xiaomi.mone.file.common.FileInfoCache;
 import com.xiaomi.mone.file.common.FileUtils;
@@ -208,9 +207,12 @@ public abstract class AbstractChannelService implements 
ChannelService {
         if (null != readResult.get().getFileMaxPointer()) {
             
fileProgress.setFileMaxPointer(readResult.get().getFileMaxPointer());
         }
-        if (FileUtil.exist(fileName)) {
+        try {
             
fileProgress.setUnixFileNode(ChannelUtil.buildUnixFileNode(fileName));
+        } catch (Throwable e) {
+            log.error("updateChannelMemory error,channelId:{},fileName:{}", 
channelDefine.getChannelId(), fileName, e);
         }
+
         fileProgress.setPodType(channelDefine.getPodType());
         fileProgress.setCtTime(ct);
     }
diff --git 
a/ozhera-log/log-agent/src/main/java/org/apache/ozhera/log/agent/channel/ChannelServiceImpl.java
 
b/ozhera-log/log-agent/src/main/java/org/apache/ozhera/log/agent/channel/ChannelServiceImpl.java
index 7ea9bc5a..6ea04c38 100644
--- 
a/ozhera-log/log-agent/src/main/java/org/apache/ozhera/log/agent/channel/ChannelServiceImpl.java
+++ 
b/ozhera-log/log-agent/src/main/java/org/apache/ozhera/log/agent/channel/ChannelServiceImpl.java
@@ -89,12 +89,12 @@ public class ChannelServiceImpl extends 
AbstractChannelService {
     private final Map<String, Long> fileReadMap = new ConcurrentHashMap<>();
 
     private final Map<String, Pair<MLog, AtomicReference<ReadResult>>> 
resultMap = new ConcurrentHashMap<>();
-    
+
     /**
      * Track file read exceptions for retry mechanism
      */
     private final Map<String, Integer> fileExceptionCountMap = new 
ConcurrentHashMap<>();
-    
+
     /**
      * Track last file truncation check time
      */
@@ -282,13 +282,13 @@ public class ChannelServiceImpl extends 
AbstractChannelService {
                     reOpen(path);
                     continue;
                 }
-                
+
                 // Check for file truncation during runtime (handles 
copytruncate rotation)
                 checkFileTruncation(path);
             }
         }), 15, 15, TimeUnit.SECONDS);
     }
-    
+
     /**
      * Check if file has been truncated during runtime (handles copytruncate 
log rotation)
      */
@@ -298,27 +298,27 @@ public class ChannelServiceImpl extends 
AbstractChannelService {
             if (fileProgress == null || fileProgress.getPointer() <= 0) {
                 return;
             }
-            
+
             java.io.File file = new java.io.File(filePath);
             if (!file.exists()) {
                 return;
             }
-            
+
             long savedPointer = fileProgress.getPointer();
             long currentFileLength = file.length();
-            
+
             if (savedPointer > currentFileLength) {
                 long lastCheckTime = 
fileTruncationCheckMap.getOrDefault(filePath, 0L);
                 long currentTime = System.currentTimeMillis();
                 boolean isImmediateTruncation = currentFileLength == 0 || 
currentFileLength < savedPointer * 0.1;
-                
+
                 // For immediate truncation, handle immediately; for gradual, 
reduce confirmation time to 15 seconds
                 if (isImmediateTruncation || (lastCheckTime > 0 && 
(currentTime - lastCheckTime) > 15000)) {
                     log.warn("File truncation detected: pointer {} > length {} 
for file:{}, resetting to 0",
                             savedPointer, currentFileLength, filePath);
                     fileProgress.setPointer(0L);
                     fileProgress.setCurrentRowNum(0L);
-                    
+
                     ILogFile logFile = logFileMap.get(filePath);
                     if (logFile != null && !logFile.getExceptionFinish()) {
                         handleFileTruncationReopen(filePath);
@@ -332,7 +332,7 @@ public class ChannelServiceImpl extends 
AbstractChannelService {
             log.debug("Error checking file truncation for path:{}", filePath, 
e);
         }
     }
-    
+
     /**
      * Handle file reopen for truncation cases, bypassing frequency limit
      * Optimized for fast rotation scenarios
@@ -479,12 +479,14 @@ public class ChannelServiceImpl extends 
AbstractChannelService {
 
     private void wrapDataToSend(String lineMsg, AtomicReference<ReadResult> 
readResult, String pattern, String patternCode, long ct) {
         RequestContext requestContext = 
RequestContext.builder().channelDefine(channelDefine).readResult(readResult.get()).lineMsg(lineMsg).build();
-        if (pipeline.invoke(requestContext)) {
-            return;
-        }
+
         LineMessage lineMessage = createLineMessage(lineMsg, readResult, 
pattern, patternCode, ct);
 
         updateChannelMemory(channelMemory, pattern, logTypeEnum, ct, 
readResult);
+
+        if (pipeline.invoke(requestContext)) {
+            return;
+        }
         lineMessageList.add(lineMessage);
 
         fileReadMap.put(pattern, ct);
@@ -545,7 +547,7 @@ public class ChannelServiceImpl extends 
AbstractChannelService {
             log.warn("file not exist,file:{}, will monitor for file creation", 
filePath);
         }
     }
-    
+
     /**
      * Check if exception is related to file truncation
      */
@@ -568,22 +570,22 @@ public class ChannelServiceImpl extends 
AbstractChannelService {
         }
         return false;
     }
-    
+
     /**
      * Handle file read exceptions with retry mechanism and error 
classification
      */
     private void handleFileReadException(String filePath, Exception e, String 
patternCode, Long channelId) {
         int exceptionCount = fileExceptionCountMap.getOrDefault(filePath, 0) + 
1;
         fileExceptionCountMap.put(filePath, exceptionCount);
-        
+
         String errorMsg = e.getMessage();
-        boolean isPermissionError = e instanceof 
java.nio.file.AccessDeniedException 
+        boolean isPermissionError = e instanceof 
java.nio.file.AccessDeniedException
                 || (errorMsg != null && (errorMsg.contains("Permission 
denied") || errorMsg.contains("access denied")));
-        boolean isFileNotFound = e instanceof java.io.FileNotFoundException 
+        boolean isFileNotFound = e instanceof java.io.FileNotFoundException
                 || (errorMsg != null && errorMsg.contains("No such file"));
-        
+
         if (isPermissionError) {
-            log.error("File permission denied, channelId:{}, file:{}, 
patternCode:{}, exceptionCount:{}", 
+            log.error("File permission denied, channelId:{}, file:{}, 
patternCode:{}, exceptionCount:{}",
                     channelId, filePath, patternCode, exceptionCount, e);
             ILogFile logFile = logFileMap.get(filePath);
             if (logFile != null) {
@@ -591,18 +593,18 @@ public class ChannelServiceImpl extends 
AbstractChannelService {
             }
             return;
         }
-        
+
         if (isFileNotFound) {
-            log.warn("File not found during read, channelId:{}, file:{}, will 
retry when file is created", 
+            log.warn("File not found during read, channelId:{}, file:{}, will 
retry when file is created",
                     channelId, filePath, exceptionCount);
             return;
         }
-        
+
         final int MAX_RETRY_COUNT = 5;
         final long RETRY_DELAY_SECONDS = 30;
-        
+
         if (exceptionCount <= MAX_RETRY_COUNT) {
-            log.warn("File read error (retry {}/{}), channelId:{}, file:{}, 
will retry in {} seconds", 
+            log.warn("File read error (retry {}/{}), channelId:{}, file:{}, 
will retry in {} seconds",
                     exceptionCount, MAX_RETRY_COUNT, channelId, filePath, 
RETRY_DELAY_SECONDS, e);
             ExecutorUtil.schedule(() -> SafeRun.run(() -> {
                 if (FileUtil.exist(filePath)) {
@@ -611,7 +613,7 @@ public class ChannelServiceImpl extends 
AbstractChannelService {
                 }
             }), RETRY_DELAY_SECONDS, TimeUnit.SECONDS);
         } else {
-            log.error("File read error exceeded max retries ({}), 
channelId:{}, file:{}, marking as finished", 
+            log.error("File read error exceeded max retries ({}), 
channelId:{}, file:{}, marking as finished",
                     MAX_RETRY_COUNT, channelId, filePath, e);
             ILogFile logFile = logFileMap.get(filePath);
             if (logFile != null) {
@@ -783,11 +785,11 @@ public class ChannelServiceImpl extends 
AbstractChannelService {
                     isCriticalRotation = fileProgress.getPointer() > 
file.length();
                 }
             }
-            
+
             // For critical rotation events, bypass frequency limit to avoid 
data loss
             final long REOPEN_THRESHOLD = isCriticalRotation ? 1000 : 10 * 
1000; // 1 second for critical, 10 seconds for normal
 
-            if (!isCriticalRotation && reOpenMap.containsKey(filePath) && 
+            if (!isCriticalRotation && reOpenMap.containsKey(filePath) &&
                     Instant.now().toEpochMilli() - reOpenMap.get(filePath) < 
REOPEN_THRESHOLD) {
                 log.info("The file has been opened too frequently.Please try 
again in 10 seconds.fileName:{}," +
                         "last time opening time.:{}", filePath, 
reOpenMap.get(filePath));
@@ -957,7 +959,7 @@ public class ChannelServiceImpl extends 
AbstractChannelService {
         for (String delFile : delFiles) {
             resultMap.remove(delFile);
         }
-        
+
         // Clean up exception count and truncation check maps
         delFiles = fileExceptionCountMap.keySet().stream()
                 .filter(filePath -> filter.test(filePath))
diff --git 
a/ozhera-log/log-agent/src/main/java/org/apache/ozhera/log/agent/channel/WildcardChannelServiceImpl.java
 
b/ozhera-log/log-agent/src/main/java/org/apache/ozhera/log/agent/channel/WildcardChannelServiceImpl.java
index 5260d50f..fc24ca23 100644
--- 
a/ozhera-log/log-agent/src/main/java/org/apache/ozhera/log/agent/channel/WildcardChannelServiceImpl.java
+++ 
b/ozhera-log/log-agent/src/main/java/org/apache/ozhera/log/agent/channel/WildcardChannelServiceImpl.java
@@ -446,13 +446,15 @@ public class WildcardChannelServiceImpl extends 
AbstractChannelService {
 
     private void wrapDataToSend(String lineMsg, AtomicReference<ReadResult> 
readResult, String patternCode, long ct) {
         RequestContext requestContext = 
RequestContext.builder().channelDefine(channelDefine).readResult(readResult.get()).lineMsg(lineMsg).build();
-        if (pipeline.invoke(requestContext)) {
-            return;
-        }
+
         String filePathName = readResult.get().getFilePathName();
         LineMessage lineMessage = createLineMessage(lineMsg, readResult, 
filePathName, patternCode, ct);
         updateChannelMemory(channelMemory, filePathName, getLogTypeEnum(), ct, 
readResult);
 
+        if (pipeline.invoke(requestContext)) {
+            return;
+        }
+
         lineMessageList.add(lineMessage);
 
         int batchSize = msgExporter.batchExportSize();


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to