This is an automated email from the ASF dual-hosted git repository.
zhangxiaowei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozhera.git
The following commit(s) were added to refs/heads/master by this push:
new aaa5c53e fix: optimize file reading exception handling (#623)
aaa5c53e is described below
commit aaa5c53ee7e9d41efaf1a8fa74e803ac132c4858
Author: wtt <[email protected]>
AuthorDate: Tue Dec 16 10:52:49 2025 +0800
fix: optimize file reading exception handling (#623)
* fix: solve the blocking problem caused by serverless startup
* fix: fix pipeline execution logic error
* fix: optimize file reading exception handling
---
.../porcessor/AgentCollectProgressProcessor.java | 41 +++++++++++----
.../log/agent/channel/AbstractChannelService.java | 6 ++-
.../log/agent/channel/ChannelServiceImpl.java | 60 +++++++++++-----------
.../agent/channel/WildcardChannelServiceImpl.java | 8 +--
4 files changed, 70 insertions(+), 45 deletions(-)
diff --git
a/ozhera-log/log-agent-server/src/main/java/org/apache/ozhera/log/server/porcessor/AgentCollectProgressProcessor.java
b/ozhera-log/log-agent-server/src/main/java/org/apache/ozhera/log/server/porcessor/AgentCollectProgressProcessor.java
index af9339e0..a1e33526 100644
---
a/ozhera-log/log-agent-server/src/main/java/org/apache/ozhera/log/server/porcessor/AgentCollectProgressProcessor.java
+++
b/ozhera-log/log-agent-server/src/main/java/org/apache/ozhera/log/server/porcessor/AgentCollectProgressProcessor.java
@@ -20,16 +20,19 @@ package org.apache.ozhera.log.server.porcessor;
import com.xiaomi.data.push.rpc.netty.NettyRequestProcessor;
import com.xiaomi.data.push.rpc.protocol.RemotingCommand;
-import org.apache.ozhera.log.api.model.vo.UpdateLogProcessCmd;
-import org.apache.ozhera.log.common.Constant;
-import org.apache.ozhera.log.server.common.Version;
-import org.apache.ozhera.log.server.service.DefaultLogProcessCollector;
import com.xiaomi.youpin.docean.Ioc;
import com.xiaomi.youpin.docean.anno.Component;
import io.netty.channel.ChannelHandlerContext;
import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.ozhera.log.api.model.vo.UpdateLogProcessCmd;
+import org.apache.ozhera.log.common.Constant;
+import org.apache.ozhera.log.server.common.Version;
+import org.apache.ozhera.log.server.service.DefaultLogProcessCollector;
import javax.annotation.Resource;
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
import java.nio.charset.StandardCharsets;
import static org.apache.ozhera.log.common.Constant.GSON;
@@ -54,13 +57,17 @@ public class AgentCollectProgressProcessor implements
NettyRequestProcessor {
log.debug("received a message from the agent");
RemotingCommand response =
RemotingCommand.createResponseCommand(Constant.RPCCMD_AGENT_CODE);
String body = new String(request.getBody(), StandardCharsets.UTF_8);
- UpdateLogProcessCmd cmd = GSON.fromJson(body,
UpdateLogProcessCmd.class);
- log.debug("a request from the client sent by the agent:{}",
cmd.getIp());
- if (null == processService &&
Ioc.ins().containsBean(DefaultLogProcessCollector.class.getCanonicalName())) {
- processService =
Ioc.ins().getBean(DefaultLogProcessCollector.class);
- }
- if (null != processService) {
- processService.collectLogProcess(cmd);
+ try {
+ UpdateLogProcessCmd cmd = GSON.fromJson(body,
UpdateLogProcessCmd.class);
+ log.debug("a request from the client sent by the agent:{}",
cmd.getIp());
+ if (null == processService &&
Ioc.ins().containsBean(DefaultLogProcessCollector.class.getCanonicalName())) {
+ processService =
Ioc.ins().getBean(DefaultLogProcessCollector.class);
+ }
+ if (null != processService) {
+ processService.collectLogProcess(cmd);
+ }
+ } catch (Exception e) {
+ log.error("processRequest error,ip:{},body:{}", getIp(ctx), body,
e);
}
response.setBody(version.toString().getBytes());
response.setBody(Constant.SUCCESS_MESSAGE.getBytes());
@@ -71,4 +78,16 @@ public class AgentCollectProgressProcessor implements
NettyRequestProcessor {
public boolean rejectRequest() {
return false;
}
+
+
+ private String getIp(ChannelHandlerContext ctx) {
+ SocketAddress sa = ctx.channel().remoteAddress();
+ if (sa instanceof InetSocketAddress) {
+ InetSocketAddress isa = (InetSocketAddress) sa;
+ String ip = isa.getAddress().getHostAddress();
+ int port = isa.getPort();
+ return String.format("%s:%d", ip, port);
+ }
+ return StringUtils.EMPTY;
+ }
}
diff --git
a/ozhera-log/log-agent/src/main/java/org/apache/ozhera/log/agent/channel/AbstractChannelService.java
b/ozhera-log/log-agent/src/main/java/org/apache/ozhera/log/agent/channel/AbstractChannelService.java
index 275df882..a34f7cdb 100644
---
a/ozhera-log/log-agent/src/main/java/org/apache/ozhera/log/agent/channel/AbstractChannelService.java
+++
b/ozhera-log/log-agent/src/main/java/org/apache/ozhera/log/agent/channel/AbstractChannelService.java
@@ -18,7 +18,6 @@
*/
package org.apache.ozhera.log.agent.channel;
-import cn.hutool.core.io.FileUtil;
import com.xiaomi.mone.file.ReadResult;
import com.xiaomi.mone.file.common.FileInfoCache;
import com.xiaomi.mone.file.common.FileUtils;
@@ -208,9 +207,12 @@ public abstract class AbstractChannelService implements
ChannelService {
if (null != readResult.get().getFileMaxPointer()) {
fileProgress.setFileMaxPointer(readResult.get().getFileMaxPointer());
}
- if (FileUtil.exist(fileName)) {
+ try {
fileProgress.setUnixFileNode(ChannelUtil.buildUnixFileNode(fileName));
+ } catch (Throwable e) {
+ log.error("updateChannelMemory error,channelId:{},fileName:{}",
channelDefine.getChannelId(), fileName, e);
}
+
fileProgress.setPodType(channelDefine.getPodType());
fileProgress.setCtTime(ct);
}
diff --git
a/ozhera-log/log-agent/src/main/java/org/apache/ozhera/log/agent/channel/ChannelServiceImpl.java
b/ozhera-log/log-agent/src/main/java/org/apache/ozhera/log/agent/channel/ChannelServiceImpl.java
index 7ea9bc5a..6ea04c38 100644
---
a/ozhera-log/log-agent/src/main/java/org/apache/ozhera/log/agent/channel/ChannelServiceImpl.java
+++
b/ozhera-log/log-agent/src/main/java/org/apache/ozhera/log/agent/channel/ChannelServiceImpl.java
@@ -89,12 +89,12 @@ public class ChannelServiceImpl extends
AbstractChannelService {
private final Map<String, Long> fileReadMap = new ConcurrentHashMap<>();
private final Map<String, Pair<MLog, AtomicReference<ReadResult>>>
resultMap = new ConcurrentHashMap<>();
-
+
/**
* Track file read exceptions for retry mechanism
*/
private final Map<String, Integer> fileExceptionCountMap = new
ConcurrentHashMap<>();
-
+
/**
* Track last file truncation check time
*/
@@ -282,13 +282,13 @@ public class ChannelServiceImpl extends
AbstractChannelService {
reOpen(path);
continue;
}
-
+
// Check for file truncation during runtime (handles
copytruncate rotation)
checkFileTruncation(path);
}
}), 15, 15, TimeUnit.SECONDS);
}
-
+
/**
* Check if file has been truncated during runtime (handles copytruncate
log rotation)
*/
@@ -298,27 +298,27 @@ public class ChannelServiceImpl extends
AbstractChannelService {
if (fileProgress == null || fileProgress.getPointer() <= 0) {
return;
}
-
+
java.io.File file = new java.io.File(filePath);
if (!file.exists()) {
return;
}
-
+
long savedPointer = fileProgress.getPointer();
long currentFileLength = file.length();
-
+
if (savedPointer > currentFileLength) {
long lastCheckTime =
fileTruncationCheckMap.getOrDefault(filePath, 0L);
long currentTime = System.currentTimeMillis();
boolean isImmediateTruncation = currentFileLength == 0 ||
currentFileLength < savedPointer * 0.1;
-
+
// For immediate truncation, handle immediately; for gradual,
reduce confirmation time to 15 seconds
if (isImmediateTruncation || (lastCheckTime > 0 &&
(currentTime - lastCheckTime) > 15000)) {
log.warn("File truncation detected: pointer {} > length {}
for file:{}, resetting to 0",
savedPointer, currentFileLength, filePath);
fileProgress.setPointer(0L);
fileProgress.setCurrentRowNum(0L);
-
+
ILogFile logFile = logFileMap.get(filePath);
if (logFile != null && !logFile.getExceptionFinish()) {
handleFileTruncationReopen(filePath);
@@ -332,7 +332,7 @@ public class ChannelServiceImpl extends
AbstractChannelService {
log.debug("Error checking file truncation for path:{}", filePath,
e);
}
}
-
+
/**
* Handle file reopen for truncation cases, bypassing frequency limit
* Optimized for fast rotation scenarios
@@ -479,12 +479,14 @@ public class ChannelServiceImpl extends
AbstractChannelService {
private void wrapDataToSend(String lineMsg, AtomicReference<ReadResult>
readResult, String pattern, String patternCode, long ct) {
RequestContext requestContext =
RequestContext.builder().channelDefine(channelDefine).readResult(readResult.get()).lineMsg(lineMsg).build();
- if (pipeline.invoke(requestContext)) {
- return;
- }
+
LineMessage lineMessage = createLineMessage(lineMsg, readResult,
pattern, patternCode, ct);
updateChannelMemory(channelMemory, pattern, logTypeEnum, ct,
readResult);
+
+ if (pipeline.invoke(requestContext)) {
+ return;
+ }
lineMessageList.add(lineMessage);
fileReadMap.put(pattern, ct);
@@ -545,7 +547,7 @@ public class ChannelServiceImpl extends
AbstractChannelService {
log.warn("file not exist,file:{}, will monitor for file creation",
filePath);
}
}
-
+
/**
* Check if exception is related to file truncation
*/
@@ -568,22 +570,22 @@ public class ChannelServiceImpl extends
AbstractChannelService {
}
return false;
}
-
+
/**
* Handle file read exceptions with retry mechanism and error
classification
*/
private void handleFileReadException(String filePath, Exception e, String
patternCode, Long channelId) {
int exceptionCount = fileExceptionCountMap.getOrDefault(filePath, 0) +
1;
fileExceptionCountMap.put(filePath, exceptionCount);
-
+
String errorMsg = e.getMessage();
- boolean isPermissionError = e instanceof
java.nio.file.AccessDeniedException
+ boolean isPermissionError = e instanceof
java.nio.file.AccessDeniedException
|| (errorMsg != null && (errorMsg.contains("Permission
denied") || errorMsg.contains("access denied")));
- boolean isFileNotFound = e instanceof java.io.FileNotFoundException
+ boolean isFileNotFound = e instanceof java.io.FileNotFoundException
|| (errorMsg != null && errorMsg.contains("No such file"));
-
+
if (isPermissionError) {
- log.error("File permission denied, channelId:{}, file:{},
patternCode:{}, exceptionCount:{}",
+ log.error("File permission denied, channelId:{}, file:{},
patternCode:{}, exceptionCount:{}",
channelId, filePath, patternCode, exceptionCount, e);
ILogFile logFile = logFileMap.get(filePath);
if (logFile != null) {
@@ -591,18 +593,18 @@ public class ChannelServiceImpl extends
AbstractChannelService {
}
return;
}
-
+
if (isFileNotFound) {
- log.warn("File not found during read, channelId:{}, file:{}, will
retry when file is created",
+ log.warn("File not found during read, channelId:{}, file:{}, will
retry when file is created",
channelId, filePath, exceptionCount);
return;
}
-
+
final int MAX_RETRY_COUNT = 5;
final long RETRY_DELAY_SECONDS = 30;
-
+
if (exceptionCount <= MAX_RETRY_COUNT) {
- log.warn("File read error (retry {}/{}), channelId:{}, file:{},
will retry in {} seconds",
+ log.warn("File read error (retry {}/{}), channelId:{}, file:{},
will retry in {} seconds",
exceptionCount, MAX_RETRY_COUNT, channelId, filePath,
RETRY_DELAY_SECONDS, e);
ExecutorUtil.schedule(() -> SafeRun.run(() -> {
if (FileUtil.exist(filePath)) {
@@ -611,7 +613,7 @@ public class ChannelServiceImpl extends
AbstractChannelService {
}
}), RETRY_DELAY_SECONDS, TimeUnit.SECONDS);
} else {
- log.error("File read error exceeded max retries ({}),
channelId:{}, file:{}, marking as finished",
+ log.error("File read error exceeded max retries ({}),
channelId:{}, file:{}, marking as finished",
MAX_RETRY_COUNT, channelId, filePath, e);
ILogFile logFile = logFileMap.get(filePath);
if (logFile != null) {
@@ -783,11 +785,11 @@ public class ChannelServiceImpl extends
AbstractChannelService {
isCriticalRotation = fileProgress.getPointer() >
file.length();
}
}
-
+
// For critical rotation events, bypass frequency limit to avoid
data loss
final long REOPEN_THRESHOLD = isCriticalRotation ? 1000 : 10 *
1000; // 1 second for critical, 10 seconds for normal
- if (!isCriticalRotation && reOpenMap.containsKey(filePath) &&
+ if (!isCriticalRotation && reOpenMap.containsKey(filePath) &&
Instant.now().toEpochMilli() - reOpenMap.get(filePath) <
REOPEN_THRESHOLD) {
log.info("The file has been opened too frequently.Please try
again in 10 seconds.fileName:{}," +
"last time opening time.:{}", filePath,
reOpenMap.get(filePath));
@@ -957,7 +959,7 @@ public class ChannelServiceImpl extends
AbstractChannelService {
for (String delFile : delFiles) {
resultMap.remove(delFile);
}
-
+
// Clean up exception count and truncation check maps
delFiles = fileExceptionCountMap.keySet().stream()
.filter(filePath -> filter.test(filePath))
diff --git
a/ozhera-log/log-agent/src/main/java/org/apache/ozhera/log/agent/channel/WildcardChannelServiceImpl.java
b/ozhera-log/log-agent/src/main/java/org/apache/ozhera/log/agent/channel/WildcardChannelServiceImpl.java
index 5260d50f..fc24ca23 100644
---
a/ozhera-log/log-agent/src/main/java/org/apache/ozhera/log/agent/channel/WildcardChannelServiceImpl.java
+++
b/ozhera-log/log-agent/src/main/java/org/apache/ozhera/log/agent/channel/WildcardChannelServiceImpl.java
@@ -446,13 +446,15 @@ public class WildcardChannelServiceImpl extends
AbstractChannelService {
private void wrapDataToSend(String lineMsg, AtomicReference<ReadResult>
readResult, String patternCode, long ct) {
RequestContext requestContext =
RequestContext.builder().channelDefine(channelDefine).readResult(readResult.get()).lineMsg(lineMsg).build();
- if (pipeline.invoke(requestContext)) {
- return;
- }
+
String filePathName = readResult.get().getFilePathName();
LineMessage lineMessage = createLineMessage(lineMsg, readResult,
filePathName, patternCode, ct);
updateChannelMemory(channelMemory, filePathName, getLogTypeEnum(), ct,
readResult);
+ if (pipeline.invoke(requestContext)) {
+ return;
+ }
+
lineMessageList.add(lineMessage);
int batchSize = msgExporter.batchExportSize();
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]