This is an automated email from the ASF dual-hosted git repository.
gaoxihui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozhera.git
The following commit(s) were added to refs/heads/master by this push:
new 90b3d621 refactor: optimize log collection and parsing logic (#605)
90b3d621 is described below
commit 90b3d6218edaea9d732bbf7d29415d4aa87a9366
Author: wtt <[email protected]>
AuthorDate: Thu Sep 25 15:14:37 2025 +0800
refactor: optimize log collection and parsing logic (#605)
* refactor: optimize log collection and parsing logic
* refactor: adding Apache license header for FilterIdEnum.java
---
.../app/service/impl/HeraAppServiceImpl.java | 15 +++--
.../log/agent/channel/AbstractChannelService.java | 4 +-
.../log/agent/channel/ChannelServiceFactory.java | 20 +++++-
.../log/agent/channel/ChannelServiceImpl.java | 26 ++++----
.../agent/channel/WildcardChannelServiceImpl.java | 26 ++++----
.../apache/ozhera/log/api/enums/FilterIdEnum.java | 40 ++++++++++++
.../ozhera/log/api/model/msg/LineMessage.java | 2 +-
.../org/apache/ozhera/log/common/PathUtils.java | 16 ++---
.../apache/ozhera/log/parse/AbstractLogParser.java | 4 +-
.../apache/ozhera/log/parse/PlaceholderParser.java | 4 +-
.../apache/ozhera/log/common/LogParserTest.java | 22 ++++++-
.../apache/ozhera/log/common/PathUtilsTest.java | 4 +-
.../extension/common/CommonExtensionService.java | 3 +
.../common/DefaultCommonExtensionService.java | 12 +++-
.../service/impl/HeralogHomePageServiceImpl.java | 76 +++++++++++++---------
.../service/impl/MilogConfigNacosServiceImpl.java | 7 +-
.../nacos/impl/SpaceConfigNacosPublisher.java | 5 ++
.../nacos/impl/StreamConfigNacosProvider.java | 7 +-
.../nacos/impl/StreamConfigNacosPublisher.java | 8 ++-
.../ozhera/log/stream/job/LogDataTransfer.java | 8 +--
20 files changed, 208 insertions(+), 101 deletions(-)
diff --git
a/ozhera-app/app-service/src/main/java/org/apache/ozhera/app/service/impl/HeraAppServiceImpl.java
b/ozhera-app/app-service/src/main/java/org/apache/ozhera/app/service/impl/HeraAppServiceImpl.java
index 4c08b6c6..6d886ac7 100644
---
a/ozhera-app/app-service/src/main/java/org/apache/ozhera/app/service/impl/HeraAppServiceImpl.java
+++
b/ozhera-app/app-service/src/main/java/org/apache/ozhera/app/service/impl/HeraAppServiceImpl.java
@@ -22,6 +22,10 @@ import
com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import com.baomidou.mybatisplus.core.toolkit.Wrappers;
import com.google.common.collect.Lists;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.dubbo.config.annotation.Service;
import org.apache.ozhera.app.api.model.HeraAppBaseInfoModel;
import org.apache.ozhera.app.api.model.HeraAppBaseInfoParticipant;
import org.apache.ozhera.app.api.model.HeraAppBaseQuery;
@@ -37,10 +41,6 @@ import org.apache.ozhera.app.model.HeraAppExcessInfo;
import org.apache.ozhera.app.model.HeraAppRole;
import org.apache.ozhera.app.service.HeraAppRoleService;
import org.apache.ozhera.app.service.extension.AppTypeServiceExtension;
-import lombok.extern.slf4j.Slf4j;
-import org.apache.commons.collections.CollectionUtils;
-import org.apache.commons.lang3.StringUtils;
-import org.apache.dubbo.config.annotation.Service;
import org.jetbrains.annotations.Nullable;
import org.springframework.beans.BeanUtils;
import org.springframework.beans.factory.annotation.Autowired;
@@ -76,6 +76,8 @@ public class HeraAppServiceImpl implements HeraAppService {
private final AppTypeServiceExtension appTypeServiceExtension;
+ private final Integer DEFAULT_LIMIT = 2000;
+
public HeraAppServiceImpl(HeraAppBaseInfoMapper heraAppBaseInfoMapper,
HeraAppExcessInfoMapper heraAppExcessInfoMapper, HeraAppRoleService
roleService, HeraAppRoleMapper heraAppRoleMapper, AppTypeServiceExtension
appTypeServiceExtension) {
this.heraAppBaseInfoMapper = heraAppBaseInfoMapper;
this.heraAppExcessInfoMapper = heraAppExcessInfoMapper;
@@ -101,6 +103,9 @@ public class HeraAppServiceImpl implements HeraAppService {
return appBaseInfo;
}).collect(Collectors.toList());
}
+ if (appBaseInfos.size() > DEFAULT_LIMIT) {
+ return new
ArrayList<>(appBaseInfos.stream().limit(DEFAULT_LIMIT).toList());
+ }
return appBaseInfos;
}
@@ -115,7 +120,7 @@ public class HeraAppServiceImpl implements HeraAppService {
platformType = appTypeServiceExtension.getAppPlatForm(type);
}
appBaseInfos = heraAppBaseInfoMapper.queryAppInfo(appName,
platformType, appType);
- }else{
+ } else {
appBaseInfos = heraAppBaseInfoMapper.queryLatestAppInfo(limit,
platformType, appType);
}
if (CollectionUtils.isNotEmpty(appBaseInfos)) {
diff --git
a/ozhera-log/log-agent/src/main/java/org/apache/ozhera/log/agent/channel/AbstractChannelService.java
b/ozhera-log/log-agent/src/main/java/org/apache/ozhera/log/agent/channel/AbstractChannelService.java
index a82bbdce..275df882 100644
---
a/ozhera-log/log-agent/src/main/java/org/apache/ozhera/log/agent/channel/AbstractChannelService.java
+++
b/ozhera-log/log-agent/src/main/java/org/apache/ozhera/log/agent/channel/AbstractChannelService.java
@@ -173,14 +173,14 @@ public abstract class AbstractChannelService implements
ChannelService {
}));
}
- protected LineMessage createLineMessage(String lineMsg,
AtomicReference<ReadResult> readResult, String pattern, String patternCode,
String ip, long ct) {
+ protected LineMessage createLineMessage(String lineMsg,
AtomicReference<ReadResult> readResult, String pattern, String patternCode,
long ct) {
LineMessage lineMessage = new LineMessage();
lineMessage.setMsgBody(lineMsg);
lineMessage.setPointer(readResult.get().getPointer());
lineMessage.setLineNumber(readResult.get().getLineNumber());
lineMessage.setFileName(pattern);
lineMessage.setProperties(LineMessage.KEY_MQ_TOPIC_TAG, patternCode);
- lineMessage.setProperties(LineMessage.KEY_IP, ip);
+ lineMessage.setProperties(LineMessage.KEY_IP, getTailPodIp(pattern));
lineMessage.setProperties(LineMessage.KEY_COLLECT_TIMESTAMP,
String.valueOf(ct));
String logType = getChannelDefine().getInput().getType();
diff --git
a/ozhera-log/log-agent/src/main/java/org/apache/ozhera/log/agent/channel/ChannelServiceFactory.java
b/ozhera-log/log-agent/src/main/java/org/apache/ozhera/log/agent/channel/ChannelServiceFactory.java
index f49e9876..6d2f042e 100644
---
a/ozhera-log/log-agent/src/main/java/org/apache/ozhera/log/agent/channel/ChannelServiceFactory.java
+++
b/ozhera-log/log-agent/src/main/java/org/apache/ozhera/log/agent/channel/ChannelServiceFactory.java
@@ -30,6 +30,7 @@ import org.apache.ozhera.log.api.enums.LogTypeEnum;
import java.util.Arrays;
import java.util.List;
+import java.util.regex.Matcher;
import java.util.regex.Pattern;
import static org.apache.ozhera.log.common.Constant.SYMBOL_COMMA;
@@ -49,7 +50,7 @@ public class ChannelServiceFactory {
private final AgentMemoryService agentMemoryService;
private final String memoryBasePath;
- private static final Pattern regexCharsPattern =
Pattern.compile("[*+?^${}()\\[\\]\\\\]");
+ private static final Pattern REGEX_CHARS_PATTERN =
Pattern.compile("[*+?^${}\\[\\]]");
private static List<String> multiSpecialFileSuffix;
@@ -115,7 +116,22 @@ public class ChannelServiceFactory {
if (StringUtils.isEmpty(logPattern)) {
return false;
}
- return regexCharsPattern.matcher(logPattern).find();
+
+ boolean hasGroup = logPattern.contains("(") &&
logPattern.contains(")") && logPattern.contains("|");
+ boolean hasRegexChars = REGEX_CHARS_PATTERN.matcher(logPattern).find();
+
+ // Case 1: Contains regular key symbols (*, ?, ^, $, { }, [ ], \, .) →
must be regular
+ if (hasRegexChars) {
+ return true;
+ }
+
+ // Case 2: Only include () and | → enumeration, not regular
+ if (hasGroup) {
+ return false;
+ }
+
+ // Case 3: Normal path
+ return false;
}
private ChannelService createStandardChannelService(MsgExporter exporter,
diff --git
a/ozhera-log/log-agent/src/main/java/org/apache/ozhera/log/agent/channel/ChannelServiceImpl.java
b/ozhera-log/log-agent/src/main/java/org/apache/ozhera/log/agent/channel/ChannelServiceImpl.java
index 07c5e325..7efdf610 100644
---
a/ozhera-log/log-agent/src/main/java/org/apache/ozhera/log/agent/channel/ChannelServiceImpl.java
+++
b/ozhera-log/log-agent/src/main/java/org/apache/ozhera/log/agent/channel/ChannelServiceImpl.java
@@ -234,7 +234,7 @@ public class ChannelServiceImpl extends
AbstractChannelService {
private void startCollectFile(Long channelId, Input input, List<String>
patterns) {
for (int i = 0; i < patterns.size(); i++) {
log.info("startCollectFile,total file:{},start:{},remain:{}",
patterns.size(), i + 1, patterns.size() - (i + 1));
- readFile(input.getPatternCode(), getTailPodIp(patterns.get(i)),
patterns.get(i), channelId);
+ readFile(input.getPatternCode(), getTailPodIp(patterns.get(i)),
channelId);
InodeFileComparator.addFile(patterns.get(i));
}
lastLineRemainSendSchedule(input.getPatternCode());
@@ -242,13 +242,11 @@ public class ChannelServiceImpl extends
AbstractChannelService {
private void handleAllFileCollectMonitor(String patternCode, String
newFilePath, Long channelId) {
- String ip = getTailPodIp(newFilePath);
-
if (logFileMap.keySet().stream().anyMatch(key ->
Objects.equals(newFilePath, key))) {
log.info("collectOnce open file:{}", newFilePath);
logFileMap.get(newFilePath).setReOpen(true);
} else {
- readFile(patternCode, ip, newFilePath, channelId);
+ readFile(patternCode, newFilePath, channelId);
}
}
@@ -296,7 +294,7 @@ public class ChannelServiceImpl extends
AbstractChannelService {
monitorFileList.add(MonitorFile.of(configPath,
cleanedPathList.getFirst(), logTypeEnum, collectOnce));
}
- private ReadListener initFileReadListener(MLog mLog, String patternCode,
String ip, String pattern) {
+ private ReadListener initFileReadListener(MLog mLog, String patternCode,
String pattern) {
AtomicReference<ReadResult> readResult = new AtomicReference<>();
ReadListener listener = new DefaultReadListener(event -> {
readResult.set(event.getReadResult());
@@ -318,7 +316,7 @@ public class ChannelServiceImpl extends
AbstractChannelService {
if (null != l) {
try {
fileColLock.lock();
- wrapDataToSend(l, readResult, pattern,
patternCode, ip, ct);
+ wrapDataToSend(l, readResult, pattern,
patternCode, ct);
} finally {
fileColLock.unlock();
}
@@ -347,7 +345,7 @@ public class ChannelServiceImpl extends
AbstractChannelService {
String remainMsg = mLog.takeRemainMsg2();
if (null != remainMsg) {
log.info("start send last
line,pattern:{},patternCode:{},ip:{},data:{}", pattern, patternCode,
getTailPodIp(pattern), remainMsg);
- wrapDataToSend(remainMsg,
referenceEntry.getValue().getValue(), pattern, patternCode,
getTailPodIp(pattern), appendTime);
+ wrapDataToSend(remainMsg,
referenceEntry.getValue().getValue(), pattern, patternCode, appendTime);
}
} finally {
fileColLock.unlock();
@@ -358,8 +356,8 @@ public class ChannelServiceImpl extends
AbstractChannelService {
}), 30, 30, TimeUnit.SECONDS);
}
- private void wrapDataToSend(String lineMsg, AtomicReference<ReadResult>
readResult, String pattern, String patternCode, String ip, long ct) {
- LineMessage lineMessage = createLineMessage(lineMsg, readResult,
pattern, patternCode, ip, ct);
+ private void wrapDataToSend(String lineMsg, AtomicReference<ReadResult>
readResult, String pattern, String patternCode, long ct) {
+ LineMessage lineMessage = createLineMessage(lineMsg, readResult,
pattern, patternCode, ct);
updateChannelMemory(channelMemory, pattern, logTypeEnum, ct,
readResult);
lineMessageList.add(lineMessage);
@@ -372,14 +370,12 @@ public class ChannelServiceImpl extends
AbstractChannelService {
}
}
- private void readFile(String patternCode, String ip, String filePath, Long
channelId) {
+ private void readFile(String patternCode, String filePath, Long channelId)
{
MLog mLog = new MLog();
if (StringUtils.isNotBlank(this.linePrefix)) {
mLog.setCustomLinePattern(this.linePrefix);
}
- String usedIp = StringUtils.isBlank(ip) ? NetUtil.getLocalIp() : ip;
-
- ReadListener listener = initFileReadListener(mLog, patternCode,
usedIp, filePath);
+ ReadListener listener = initFileReadListener(mLog, patternCode,
filePath);
Map<String, ChannelMemory.FileProgress> fileProgressMap =
channelMemory.getFileProgressMap();
printMapToJson(fileProgressMap, collectOnce);
@@ -399,7 +395,7 @@ public class ChannelServiceImpl extends
AbstractChannelService {
logFile.readLine();
} catch (Exception e) {
logFile.setExceptionFinish();
- log.error("logFile read line
err,channelId:{},localIp:{},file:{},patternCode:{}", channelId, usedIp,
fileProgressMap, patternCode, e);
+ log.error("logFile read line
err,channelId:{},file:{},patternCode:{}", channelId, fileProgressMap,
patternCode, e);
}
});
futureMap.put(filePath, future);
@@ -568,7 +564,7 @@ public class ChannelServiceImpl extends
AbstractChannelService {
String ip = StringUtils.isBlank(tailPodIp) ? NetUtil.getLocalIp()
: tailPodIp;
if (null == logFile || logFile.getExceptionFinish()) {
// Add new log file
- readFile(channelDefine.getInput().getPatternCode(), ip,
filePath, getChannelId());
+ readFile(channelDefine.getInput().getPatternCode(), filePath,
getChannelId());
log.info("watch new file create for
channelId:{},ip:{},path:{}", getChannelId(), filePath, ip);
} else {
handleExistingLogFileWithRetry(logFile, filePath, ip);
diff --git
a/ozhera-log/log-agent/src/main/java/org/apache/ozhera/log/agent/channel/WildcardChannelServiceImpl.java
b/ozhera-log/log-agent/src/main/java/org/apache/ozhera/log/agent/channel/WildcardChannelServiceImpl.java
index 0d261870..0b401efd 100644
---
a/ozhera-log/log-agent/src/main/java/org/apache/ozhera/log/agent/channel/WildcardChannelServiceImpl.java
+++
b/ozhera-log/log-agent/src/main/java/org/apache/ozhera/log/agent/channel/WildcardChannelServiceImpl.java
@@ -135,7 +135,7 @@ public class WildcardChannelServiceImpl extends
AbstractChannelService {
}
memoryService.cleanChannelMemoryContent(channelId, patterns);
- startCollectFile(channelId, input, getTailPodIp(logPattern));
+ startCollectFile(channelId, input);
startExportQueueDataThread();
memoryService.refreshMemory(channelMemory);
@@ -143,13 +143,13 @@ public class WildcardChannelServiceImpl extends
AbstractChannelService {
}
- private void startCollectFile(Long channelId, Input input, String ip) {
+ private void startCollectFile(Long channelId, Input input) {
try {
// Load the restart file
String restartFile = buildRestartFilePath();
FileInfoCache.ins().load(restartFile);
- fileMonitor = createFileMonitor(input.getPatternCode(), ip);
+ fileMonitor = createFileMonitor(input.getPatternCode());
String fileExpression = buildFileExpression(input.getLogPattern());
@@ -166,7 +166,7 @@ public class WildcardChannelServiceImpl extends
AbstractChannelService {
fileCollFutures.add(getExecutorServiceByType(getLogTypeEnum()).submit(() ->
monitorFileChanges(fileMonitor, monitorPath, pattern)));
}
} catch (Exception e) {
- log.error("startCollectFile error, channelId: {}, input: {}, ip:
{}", channelId, GSON.toJson(input), ip, e);
+ log.error("startCollectFile error, channelId: {}, input: {}",
channelId, GSON.toJson(input), e);
}
}
@@ -257,7 +257,7 @@ public class WildcardChannelServiceImpl extends
AbstractChannelService {
}
- private HeraFileMonitor createFileMonitor(String patternCode, String ip) {
+ private HeraFileMonitor createFileMonitor(String patternCode) {
MLog mLog = new MLog();
if (StringUtils.isNotBlank(this.linePrefix)) {
mLog.setCustomLinePattern(this.linePrefix);
@@ -272,7 +272,7 @@ public class WildcardChannelServiceImpl extends
AbstractChannelService {
log.info("Empty data");
return;
}
- processLogLines(readResult, patternCode, ip, mLog);
+ processLogLines(readResult, patternCode, mLog);
});
monitor.setListener(defaultMonitorListener);
@@ -280,11 +280,11 @@ public class WildcardChannelServiceImpl extends
AbstractChannelService {
/**
* Collect all data in the last row of data that has not been sent for
more than 10 seconds.
*/
- scheduleLastLineSender(mLog, readResult, patternCode, ip);
+ scheduleLastLineSender(mLog, readResult, patternCode);
return monitor;
}
- private void processLogLines(AtomicReference<ReadResult> readResult,
String patternCode, String ip, MLog mLog) {
+ private void processLogLines(AtomicReference<ReadResult> readResult,
String patternCode, MLog mLog) {
long currentTime = System.currentTimeMillis();
ReadResult result = readResult.get();
@@ -297,7 +297,7 @@ public class WildcardChannelServiceImpl extends
AbstractChannelService {
if (line != null) {
try {
reentrantLock.lock();
- wrapDataToSend(line, readResult, patternCode, ip,
currentTime);
+ wrapDataToSend(line, readResult, patternCode, currentTime);
} finally {
reentrantLock.unlock();
}
@@ -307,7 +307,7 @@ public class WildcardChannelServiceImpl extends
AbstractChannelService {
});
}
- private void scheduleLastLineSender(MLog mLog, AtomicReference<ReadResult>
readResult, String patternCode, String ip) {
+ private void scheduleLastLineSender(MLog mLog, AtomicReference<ReadResult>
readResult, String patternCode) {
lastFileLineScheduledFuture = ExecutorUtil.scheduleAtFixedRate(() -> {
Long appendTime = mLog.getAppendTime();
if (appendTime != null && Instant.now().toEpochMilli() -
appendTime > 10 * 1000) {
@@ -316,7 +316,7 @@ public class WildcardChannelServiceImpl extends
AbstractChannelService {
String remainMsg = mLog.takeRemainMsg2();
if (null != remainMsg) {
log.info("start send last line, fileName:{},
patternCode:{}, data:{}", readResult.get().getFilePathName(), patternCode,
remainMsg);
- wrapDataToSend(remainMsg, readResult, patternCode,
ip, Instant.now().toEpochMilli());
+ wrapDataToSend(remainMsg, readResult, patternCode,
Instant.now().toEpochMilli());
}
} finally {
reentrantLock.unlock();
@@ -326,9 +326,9 @@ public class WildcardChannelServiceImpl extends
AbstractChannelService {
}, 30, 30, TimeUnit.SECONDS);
}
- private void wrapDataToSend(String lineMsg, AtomicReference<ReadResult>
readResult, String patternCode, String localIp, long ct) {
+ private void wrapDataToSend(String lineMsg, AtomicReference<ReadResult>
readResult, String patternCode, long ct) {
String filePathName = readResult.get().getFilePathName();
- LineMessage lineMessage = createLineMessage(lineMsg, readResult,
filePathName, patternCode, localIp, ct);
+ LineMessage lineMessage = createLineMessage(lineMsg, readResult,
filePathName, patternCode, ct);
updateChannelMemory(channelMemory, filePathName, getLogTypeEnum(), ct,
readResult);
lineMessageList.add(lineMessage);
diff --git
a/ozhera-log/log-api/src/main/java/org/apache/ozhera/log/api/enums/FilterIdEnum.java
b/ozhera-log/log-api/src/main/java/org/apache/ozhera/log/api/enums/FilterIdEnum.java
new file mode 100644
index 00000000..402a9dbc
--- /dev/null
+++
b/ozhera-log/log-api/src/main/java/org/apache/ozhera/log/api/enums/FilterIdEnum.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.ozhera.log.api.enums;
+
+import lombok.Getter;
+
+/**
+ * @author wtt
+ * @date 2025/9/15 9:58
+ * @version 1.0
+ */
+@Getter
+public enum FilterIdEnum {
+ FILTER_SPACE_ID(1, "space"),
+ FILTER_STORE_ID(2, "store"),
+ FILTER_TAIL_ID(3, "tail");
+ final int code;
+ final String desc;
+
+ FilterIdEnum(int code, String desc) {
+ this.code = code;
+ this.desc = desc;
+ }
+}
diff --git
a/ozhera-log/log-api/src/main/java/org/apache/ozhera/log/api/model/msg/LineMessage.java
b/ozhera-log/log-api/src/main/java/org/apache/ozhera/log/api/model/msg/LineMessage.java
index ae81d570..3b11a700 100644
---
a/ozhera-log/log-api/src/main/java/org/apache/ozhera/log/api/model/msg/LineMessage.java
+++
b/ozhera-log/log-api/src/main/java/org/apache/ozhera/log/api/model/msg/LineMessage.java
@@ -54,7 +54,7 @@ public class LineMessage implements Serializable {
public LineMessage() {
}
- public long getTimestamp() {
+ public long extractTimestamp() {
String value = extMap.get(KEY_COLLECT_TIMESTAMP);
if (value == null || "".equals(value)) {
return 0;
diff --git
a/ozhera-log/log-common/src/main/java/org/apache/ozhera/log/common/PathUtils.java
b/ozhera-log/log-common/src/main/java/org/apache/ozhera/log/common/PathUtils.java
index b3524c7c..8efb2a6b 100644
---
a/ozhera-log/log-common/src/main/java/org/apache/ozhera/log/common/PathUtils.java
+++
b/ozhera-log/log-common/src/main/java/org/apache/ozhera/log/common/PathUtils.java
@@ -242,12 +242,10 @@ public class PathUtils {
}
private static void readFile(String filepath, String fileName,
List<String> list) throws FileNotFoundException, IOException {
- try {
- File file = new File(filepath);
- if (!file.isDirectory()) {
- return;
- } else if (file.isDirectory()) {
- String[] fileList = file.list();
+ File file = new File(filepath);
+ if (file.isDirectory()) {
+ String[] fileList = file.list();
+ if (null != fileList && fileList.length > 0) {
for (int i = 0; i < fileList.length; i++) {
String subPath;
if (filepath.endsWith("/")) {
@@ -263,14 +261,8 @@ public class PathUtils {
readFile(subPath, fileName, list);
}
}
-
}
- } catch (FileNotFoundException e) {
- throw e;
- } catch (IOException e) {
- throw e;
}
- return;
}
private static void readFile(String filepath, String fileName, String
dictionaries, List<String> list) throws FileNotFoundException, IOException {
diff --git
a/ozhera-log/log-common/src/main/java/org/apache/ozhera/log/parse/AbstractLogParser.java
b/ozhera-log/log-common/src/main/java/org/apache/ozhera/log/parse/AbstractLogParser.java
index 50ad6a3f..558d437e 100644
---
a/ozhera-log/log-common/src/main/java/org/apache/ozhera/log/parse/AbstractLogParser.java
+++
b/ozhera-log/log-common/src/main/java/org/apache/ozhera/log/parse/AbstractLogParser.java
@@ -129,7 +129,7 @@ public abstract class AbstractLogParser implements
LogParser {
if (!ret.containsKey(esKeyMap_timestamp) &&
logData.startsWith(LOG_PREFIX)) {
String timeStamp = StringUtils.substringBetween(logData,
LOG_PREFIX, LOG_SUFFFIX);
Long time = getTimestampFromString(timeStamp, collectStamp);
- ret.put(esKeyMap_timestamp, time);
+ ret.put(esKeyMap_timestamp, time != null ? time :
System.currentTimeMillis());
}
/**
* Special handling, only dates starting with a date in the file such
as yyyy-mm-dd HH:mm:ss will be extracted
@@ -137,7 +137,7 @@ public abstract class AbstractLogParser implements
LogParser {
if (!ret.containsKey(esKeyMap_timestamp) &&
logData.startsWith(specialTimePrefix)) {
String timeStamp = StringUtils.substring(logData, 0,
specialTimeLength);
Long time = getTimestampFromString(timeStamp, collectStamp);
- ret.put(esKeyMap_timestamp, time);
+ ret.put(esKeyMap_timestamp, time != null ? time :
System.currentTimeMillis());
}
}
diff --git
a/ozhera-log/log-common/src/main/java/org/apache/ozhera/log/parse/PlaceholderParser.java
b/ozhera-log/log-common/src/main/java/org/apache/ozhera/log/parse/PlaceholderParser.java
index 6987a1ab..a76dd2ab 100644
---
a/ozhera-log/log-common/src/main/java/org/apache/ozhera/log/parse/PlaceholderParser.java
+++
b/ozhera-log/log-common/src/main/java/org/apache/ozhera/log/parse/PlaceholderParser.java
@@ -165,13 +165,13 @@ public class PlaceholderParser extends AbstractLogParser {
} else {
// 静态部分严格匹配
if (!remaining.startsWith(part.text)) {
- return Collections.emptyMap();
+ return new HashMap<>();
}
remaining = remaining.substring(part.text.length());
}
}
} catch (Exception e) {
- return Collections.emptyMap();
+ return new HashMap<>();
}
return result;
diff --git
a/ozhera-log/log-common/src/test/java/org/apache/ozhera/log/common/LogParserTest.java
b/ozhera-log/log-common/src/test/java/org/apache/ozhera/log/common/LogParserTest.java
index d419cf66..0e809025 100644
---
a/ozhera-log/log-common/src/test/java/org/apache/ozhera/log/common/LogParserTest.java
+++
b/ozhera-log/log-common/src/test/java/org/apache/ozhera/log/common/LogParserTest.java
@@ -151,7 +151,27 @@ public class LogParserTest {
String parseScript = "[%s %s[2@ ] %s[-]] -[trace_id=%s] %s - %s";
String logData = "[INFO 2025-09-03 09:47:05.233 http-nio-1115-exec-4]
-[trace_id=47f8940234d777de50d17685171a6183]
com.test.update.web.controller.v0.UpdatesController - request info
d:sapphiren_id_global to test";
Long collectStamp = Instant.now().toEpochMilli();
- Integer parserType =
LogParserFactory.LogParserEnum.PLACEHOLDER_PARSE.getCode();
+ Integer parserType =
LogParserFactory.LogParserEnum.CUSTOM_PARSE.getCode();
+ LogParser customParse = LogParserFactory.getLogParser(parserType,
keyList, valueList, parseScript, topicName, tailName, tag, logStoreName,
keyOrderList);
+ Map<String, Object> parse = customParse.parse(logData, "", 1l,
collectStamp, "");
+ System.out.println(parse);
+ List<String> dataList = customParse.parseLogData(logData);
+
+ System.out.println(customParse.getTimestampFromString("2023-08-25
10:46:09.239", collectStamp));
+ stopwatch.stop();
+ log.info("cost time:{}", stopwatch.elapsed().toMillis());
+ }
+
+ @Test
+ public void LogPlaceholderParserTest1() throws Exception {
+ Stopwatch stopwatch = Stopwatch.createStarted();
+ String keyList =
"timestamp:date,level:keyword,traceId:keyword,threadName:text,className:text,line:keyword,methodName:keyword,message:text,podName:keyword,ng_id:text,logstore:keyword,logsource:keyword,mqtopic:keyword,mqtag:keyword,logip:keyword,tail:keyword,linenumber:long,tailId:integer,spaceId:integer,storeId:integer,deploySpace:keyword";
+ String keyOrderList =
"timestamp:1,level:1,traceId:1,threadName:1,className:1,line:1,methodName:1,message:1,podName:1,ng_id:2,logstore:3,logsource:3,mqtopic:3,mqtag:3,logip:3,tail:3,linenumber:3,tailId:3,spaceId:3,storeId:3,deploySpace:3";
+ String valueList = "0,2,5,4,-1,6,-1,7,1,3";
+ String parseScript = "[%s]-[%s]-[%s]-[%s]-[%s]-[%s]-[%s]-%s";
+ String logData = "[2025-09-16 14:50:22.718]
[mit-after-sales-asp-spareparts-service-1201735-5486d457fd-hrcmk] [INFO] []
[DubboServerHandler-10.158.197.220:20880-thread-196]
[fa7ff7698c88a87dd9f5d1541ee33130] [?.?(?:?)] transferTypeValueIllegalNew
removed,
values:[C015520011700],rules:[{\\\"id\\\":805868,\\\"ruleId\\\":2674,\\\"type\\\":15,\\\"value\\\":\\\"SCNE010076600\\\"}]\\r\\n";
+ Long collectStamp = Instant.now().toEpochMilli();
+ Integer parserType =
LogParserFactory.LogParserEnum.CUSTOM_PARSE.getCode();
LogParser customParse = LogParserFactory.getLogParser(parserType,
keyList, valueList, parseScript, topicName, tailName, tag, logStoreName,
keyOrderList);
Map<String, Object> parse = customParse.parse(logData, "", 1l,
collectStamp, "");
System.out.println(parse);
diff --git
a/ozhera-log/log-common/src/test/java/org/apache/ozhera/log/common/PathUtilsTest.java
b/ozhera-log/log-common/src/test/java/org/apache/ozhera/log/common/PathUtilsTest.java
index b2ad8170..f46fb8d5 100644
---
a/ozhera-log/log-common/src/test/java/org/apache/ozhera/log/common/PathUtilsTest.java
+++
b/ozhera-log/log-common/src/test/java/org/apache/ozhera/log/common/PathUtilsTest.java
@@ -219,10 +219,10 @@ public class PathUtilsTest {
@Test
public void test81() {
- String logPattern = "/home/work/log/error-2022-08-04_05_1.log";
+ String logPattern = "/home/work/logs/.*_total.log";
Pattern pattern = Pattern.compile(logPattern);
- Assert.assertEquals(false,
pattern.matcher("/home/work/log/aa/server.log").matches());
+ Assert.assertEquals(true,
pattern.matcher("/home/work/logs/log_total.log").matches());
Assert.assertEquals(false,
pattern.matcher("/home/work/log/bb/server.log").matches());
Assert.assertEquals(false,
pattern.matcher("/home/work/log/aa/bb/server.log").matches());
diff --git
a/ozhera-log/log-manager/src/main/java/org/apache/ozhera/log/manager/service/extension/common/CommonExtensionService.java
b/ozhera-log/log-manager/src/main/java/org/apache/ozhera/log/manager/service/extension/common/CommonExtensionService.java
index f0e1086b..a2e7436b 100644
---
a/ozhera-log/log-manager/src/main/java/org/apache/ozhera/log/manager/service/extension/common/CommonExtensionService.java
+++
b/ozhera-log/log-manager/src/main/java/org/apache/ozhera/log/manager/service/extension/common/CommonExtensionService.java
@@ -18,6 +18,7 @@
*/
package org.apache.ozhera.log.manager.service.extension.common;
+import org.apache.ozhera.log.api.enums.FilterIdEnum;
import org.apache.ozhera.log.manager.model.vo.LogQuery;
import org.elasticsearch.index.query.BoolQueryBuilder;
import org.elasticsearch.index.query.TermQueryBuilder;
@@ -56,4 +57,6 @@ public interface CommonExtensionService {
String getSpaceDataId(Long spaceId);
List<String> queryMachineRegions();
+
+ Boolean matchesCondition(Long space, FilterIdEnum idEnum, Long id);
}
diff --git
a/ozhera-log/log-manager/src/main/java/org/apache/ozhera/log/manager/service/extension/common/DefaultCommonExtensionService.java
b/ozhera-log/log-manager/src/main/java/org/apache/ozhera/log/manager/service/extension/common/DefaultCommonExtensionService.java
index 947782d5..679b022a 100644
---
a/ozhera-log/log-manager/src/main/java/org/apache/ozhera/log/manager/service/extension/common/DefaultCommonExtensionService.java
+++
b/ozhera-log/log-manager/src/main/java/org/apache/ozhera/log/manager/service/extension/common/DefaultCommonExtensionService.java
@@ -18,12 +18,13 @@
*/
package org.apache.ozhera.log.manager.service.extension.common;
-import org.apache.ozhera.log.api.enums.MQSourceEnum;
-import org.apache.ozhera.log.api.enums.MachineRegionEnum;
-import org.apache.ozhera.log.manager.model.vo.LogQuery;
import com.xiaomi.youpin.docean.anno.Service;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
+import org.apache.ozhera.log.api.enums.FilterIdEnum;
+import org.apache.ozhera.log.api.enums.MQSourceEnum;
+import org.apache.ozhera.log.api.enums.MachineRegionEnum;
+import org.apache.ozhera.log.manager.model.vo.LogQuery;
import org.elasticsearch.index.query.BoolQueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.index.query.TermQueryBuilder;
@@ -106,6 +107,11 @@ public class DefaultCommonExtensionService implements
CommonExtensionService {
return
Arrays.stream(MachineRegionEnum.values()).map(MachineRegionEnum::getEn).collect(Collectors.toList());
}
+ @Override
+ public Boolean matchesCondition(Long spaceId, FilterIdEnum idEnum, Long
id) {
+ return true;
+ }
+
@Getter
public static enum QueryTypeEnum {
ID,
diff --git
a/ozhera-log/log-manager/src/main/java/org/apache/ozhera/log/manager/service/impl/HeralogHomePageServiceImpl.java
b/ozhera-log/log-manager/src/main/java/org/apache/ozhera/log/manager/service/impl/HeralogHomePageServiceImpl.java
index d4c8373f..878d6a9d 100644
---
a/ozhera-log/log-manager/src/main/java/org/apache/ozhera/log/manager/service/impl/HeralogHomePageServiceImpl.java
+++
b/ozhera-log/log-manager/src/main/java/org/apache/ozhera/log/manager/service/impl/HeralogHomePageServiceImpl.java
@@ -19,7 +19,11 @@
package org.apache.ozhera.log.manager.service.impl;
import com.google.common.collect.Lists;
+import com.xiaomi.youpin.docean.anno.Service;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.collections.CollectionUtils;
import org.apache.ozhera.app.api.response.AppBaseInfo;
+import org.apache.ozhera.log.api.enums.FilterIdEnum;
import org.apache.ozhera.log.common.Config;
import org.apache.ozhera.log.common.Result;
import org.apache.ozhera.log.exception.CommonError;
@@ -34,9 +38,8 @@ import
org.apache.ozhera.log.manager.model.pojo.MilogLogStoreDO;
import org.apache.ozhera.log.manager.model.pojo.MilogLogTailDo;
import org.apache.ozhera.log.manager.model.pojo.MilogStoreSpaceAuth;
import org.apache.ozhera.log.manager.service.HeralogHomePageService;
-import com.xiaomi.youpin.docean.anno.Service;
-import lombok.extern.slf4j.Slf4j;
-import org.apache.commons.collections.CollectionUtils;
+import
org.apache.ozhera.log.manager.service.extension.common.CommonExtensionService;
+import
org.apache.ozhera.log.manager.service.extension.common.CommonExtensionServiceFactory;
import org.jetbrains.annotations.Nullable;
import javax.annotation.Resource;
@@ -44,27 +47,28 @@ import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import java.util.Objects;
import java.util.stream.Collectors;
@Slf4j
@Service
public class HeralogHomePageServiceImpl implements HeralogHomePageService {
-
+
@Resource
private MilogLogTailDao milogLogtailDao;
-
+
@Resource
private HeraAppServiceImpl heraAppService;
-
+
@Resource
private MilogLogstoreDao milogLogstoreDao;
-
+
@Resource
private MilogStoreSpaceAuthDao milogStoreSpaceAuthDao;
-
+
private List<ValueDTO<String>> milogpattern;
-
+
+ private CommonExtensionService commonExtensionService;
+
{
String pattern = Config.ins().get("milogpattern", "");
String[] split = pattern.split(",");
@@ -74,7 +78,11 @@ public class HeralogHomePageServiceImpl implements
HeralogHomePageService {
}
milogpattern = valueDTOS;
}
-
+
+ public void init() {
+ commonExtensionService =
CommonExtensionServiceFactory.getCommonExtensionService();
+ }
+
@Override
public Result<Map<String, Object>> milogAccess() {
Long total = heraAppService.getAppCount();
@@ -84,7 +92,7 @@ public class HeralogHomePageServiceImpl implements
HeralogHomePageService {
map.put("access", access);
return new Result<>(CommonError.Success.getCode(),
CommonError.Success.getMessage(), map);
}
-
+
@Override
public Result<List<UnAccessAppDTO>> unAccessAppList() {
List<AppBaseInfo> appBaseInfos = heraAppService.queryAllExistsApp();
@@ -99,30 +107,33 @@ public class HeralogHomePageServiceImpl implements
HeralogHomePageService {
}
return new Result<>(CommonError.Success.getCode(),
CommonError.Success.getMessage(), list);
}
-
+
@Override
public Result<List<MilogSpaceTreeDTO>> getMilogSpaceTree(Long spaceId) {
List<MilogLogStoreDO> stores = getMilogLogStoreDOS(spaceId);
- List<MilogSpaceTreeDTO> spaceTreeDTOS =
stores.stream().map(milogLogstoreDO -> {
- Long logstoreDOId = milogLogstoreDO.getId();
+ assert stores != null;
+ List<MilogSpaceTreeDTO> spaceTrees =
stores.stream().map(milogLogstoreDO -> {
+ Long logStoreDoId = milogLogstoreDO.getId();
MilogSpaceTreeDTO milogSpaceTreeDTO = new MilogSpaceTreeDTO();
milogSpaceTreeDTO.setLabel(milogLogstoreDO.getLogstoreName());
- milogSpaceTreeDTO.setValue(logstoreDOId);
- List<MilogLogTailDo> logTailDos =
milogLogtailDao.getMilogLogtailByStoreId(logstoreDOId);
+ milogSpaceTreeDTO.setValue(logStoreDoId);
+ List<MilogLogTailDo> logTailDos =
milogLogtailDao.getMilogLogtailByStoreId(logStoreDoId);
if (CollectionUtils.isNotEmpty(logTailDos)) {
- List<MapDTO<String, Long>> collect =
logTailDos.stream().map(logTailDo -> {
- MapDTO<String, Long> mapDTO = new MapDTO();
- mapDTO.setValue(logTailDo.getId());
- mapDTO.setLabel(logTailDo.getTail());
- return mapDTO;
- }).collect(Collectors.toList());
+ List<MapDTO<String, Long>> collect = logTailDos.stream()
+ .filter(logTailDo ->
commonExtensionService.matchesCondition(spaceId, FilterIdEnum.FILTER_TAIL_ID,
logTailDo.getId()))
+ .map(logTailDo -> {
+ MapDTO<String, Long> mapDTO = new MapDTO<>();
+ mapDTO.setValue(logTailDo.getId());
+ mapDTO.setLabel(logTailDo.getTail());
+ return mapDTO;
+ }).collect(Collectors.toList());
milogSpaceTreeDTO.setChildren(collect);
}
return milogSpaceTreeDTO;
}).collect(Collectors.toList());
- return Result.success(spaceTreeDTOS);
+ return Result.success(spaceTrees);
}
-
+
/**
* Query the store that originally belonged to the space, and query the
authorized store
*
@@ -132,21 +143,24 @@ public class HeralogHomePageServiceImpl implements
HeralogHomePageService {
@Override
@Nullable
public List<MilogLogStoreDO> getMilogLogStoreDOS(Long spaceId) {
- List<MilogLogStoreDO> storeDOS = Lists.newArrayList();
+ List<MilogLogStoreDO> storeDoS = Lists.newArrayList();
List<MilogLogStoreDO> stores =
milogLogstoreDao.getMilogLogstoreBySpaceId(spaceId);
List<MilogStoreSpaceAuth> storeSpaceAuths =
milogStoreSpaceAuthDao.queryStoreIdsBySpaceId(spaceId);
if (CollectionUtils.isNotEmpty(stores)) {
- storeDOS = stores;
+ storeDoS = stores;
}
if (CollectionUtils.isNotEmpty(storeSpaceAuths)) {
List<MilogLogStoreDO> collect = storeSpaceAuths.stream()
.map(storeSpaceAuth ->
milogLogstoreDao.queryById(storeSpaceAuth.getStoreId()))
- .filter(Objects::nonNull).collect(Collectors.toList());
- storeDOS.addAll(collect);
+ .toList();
+ storeDoS.addAll(collect);
}
- return storeDOS;
+ if (CollectionUtils.isNotEmpty(storeDoS)) {
+ storeDoS = storeDoS.stream().filter(store -> null != store &&
commonExtensionService.matchesCondition(spaceId, FilterIdEnum.FILTER_STORE_ID,
store.getId())).toList();
+ }
+ return storeDoS;
}
-
+
@Override
public Result<List<ValueDTO<String>>> getMiloglogAccessPattern() {
return new Result<>(CommonError.Success.getCode(),
CommonError.Success.getMessage(), milogpattern);
diff --git
a/ozhera-log/log-manager/src/main/java/org/apache/ozhera/log/manager/service/impl/MilogConfigNacosServiceImpl.java
b/ozhera-log/log-manager/src/main/java/org/apache/ozhera/log/manager/service/impl/MilogConfigNacosServiceImpl.java
index 712ba725..f3bc6d45 100644
---
a/ozhera-log/log-manager/src/main/java/org/apache/ozhera/log/manager/service/impl/MilogConfigNacosServiceImpl.java
+++
b/ozhera-log/log-manager/src/main/java/org/apache/ozhera/log/manager/service/impl/MilogConfigNacosServiceImpl.java
@@ -302,7 +302,12 @@ public class MilogConfigNacosServiceImpl implements
MilogConfigNacosService {
private synchronized MilogSpaceData dealSpaceConfigByRule(
String motorRoomEn, Long spaceId, Long storeId, Long tailId,
Integer type, String changeType) {
- MilogSpaceData existConfig =
getSpaceConfigNacosProvider(motorRoomEn).getConfig(spaceId);
+ SpaceConfigNacosProvider configNacosProvider =
getSpaceConfigNacosProvider(motorRoomEn);
+ if (null == configNacosProvider) {
+ log.error("configNacosProvider is null,contact
us,motorRoomEn:{},tailId:{},changeType:{}", motorRoomEn, tailId, changeType);
+ return null;
+ }
+ MilogSpaceData existConfig = configNacosProvider.getConfig(spaceId);
// new configuration
if (null == existConfig ||
OperateEnum.ADD_OPERATE.getCode().equals(type)) {
// The configuration is not configured yet, initialize the
configuration
diff --git
a/ozhera-log/log-manager/src/main/java/org/apache/ozhera/log/manager/service/nacos/impl/SpaceConfigNacosPublisher.java
b/ozhera-log/log-manager/src/main/java/org/apache/ozhera/log/manager/service/nacos/impl/SpaceConfigNacosPublisher.java
index f2aa07b0..b82efba4 100644
---
a/ozhera-log/log-manager/src/main/java/org/apache/ozhera/log/manager/service/nacos/impl/SpaceConfigNacosPublisher.java
+++
b/ozhera-log/log-manager/src/main/java/org/apache/ozhera/log/manager/service/nacos/impl/SpaceConfigNacosPublisher.java
@@ -21,6 +21,7 @@ package org.apache.ozhera.log.manager.service.nacos.impl;
import com.alibaba.nacos.api.config.ConfigService;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang3.StringUtils;
import
org.apache.ozhera.log.manager.service.extension.common.CommonExtensionServiceFactory;
import org.apache.ozhera.log.manager.service.nacos.DynamicConfigPublisher;
import org.apache.ozhera.log.model.MilogSpaceData;
@@ -47,6 +48,10 @@ public class SpaceConfigNacosPublisher implements
DynamicConfigPublisher<MilogSp
String dataId =
CommonExtensionServiceFactory.getCommonExtensionService().getLogManagePrefix()
+ TAIL_CONFIG_DATA_ID + uniqueSpace;
try {
if (null != configService) {
+ if (StringUtils.isBlank(dataId)) {
+ log.error("dataId is null,uniqueSpace:{},config:{}",
uniqueSpace, configJson);
+ return;
+ }
configService.publishConfig(dataId, DEFAULT_GROUP_ID,
gson.toJson(config));
} else {
log.warn("configService is null,uniqueSpace:{},config:{}",
uniqueSpace, configJson);
diff --git
a/ozhera-log/log-manager/src/main/java/org/apache/ozhera/log/manager/service/nacos/impl/StreamConfigNacosProvider.java
b/ozhera-log/log-manager/src/main/java/org/apache/ozhera/log/manager/service/nacos/impl/StreamConfigNacosProvider.java
index 2768249c..124c7f17 100644
---
a/ozhera-log/log-manager/src/main/java/org/apache/ozhera/log/manager/service/nacos/impl/StreamConfigNacosProvider.java
+++
b/ozhera-log/log-manager/src/main/java/org/apache/ozhera/log/manager/service/nacos/impl/StreamConfigNacosProvider.java
@@ -49,7 +49,12 @@ public class StreamConfigNacosProvider implements
DynamicConfigProvider<MiLogStr
if (null == spaceId) {
rules =
configService.getConfig(CommonExtensionServiceFactory.getCommonExtensionService().getLogManagePrefix()
+ NAMESPACE_CONFIG_DATA_ID, DEFAULT_GROUP_ID, DEFAULT_TIME_OUT_MS);
} else {
- rules =
configService.getConfig(CommonExtensionServiceFactory.getCommonExtensionService().getSpaceDataId(spaceId),
DEFAULT_GROUP_ID, DEFAULT_TIME_OUT_MS);
+ String spaceDataId =
CommonExtensionServiceFactory.getCommonExtensionService().getSpaceDataId(spaceId);
+ if (StringUtils.isBlank(spaceDataId)) {
+ log.error("The space does not exist, please check the
spaceId,spaceId:{}", spaceId);
+ return null;
+ }
+ rules = configService.getConfig(spaceDataId, DEFAULT_GROUP_ID,
DEFAULT_TIME_OUT_MS);
}
log.info("The NACOS query log is initially configured:{}", rules);
diff --git
a/ozhera-log/log-manager/src/main/java/org/apache/ozhera/log/manager/service/nacos/impl/StreamConfigNacosPublisher.java
b/ozhera-log/log-manager/src/main/java/org/apache/ozhera/log/manager/service/nacos/impl/StreamConfigNacosPublisher.java
index 2682d64d..fddd0525 100644
---
a/ozhera-log/log-manager/src/main/java/org/apache/ozhera/log/manager/service/nacos/impl/StreamConfigNacosPublisher.java
+++
b/ozhera-log/log-manager/src/main/java/org/apache/ozhera/log/manager/service/nacos/impl/StreamConfigNacosPublisher.java
@@ -23,6 +23,7 @@ import com.alibaba.nacos.api.exception.NacosException;
import lombok.Getter;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang3.StringUtils;
import
org.apache.ozhera.log.manager.service.extension.common.CommonExtensionServiceFactory;
import org.apache.ozhera.log.manager.service.nacos.DynamicConfigPublisher;
import org.apache.ozhera.log.model.MiLogStreamConfig;
@@ -49,7 +50,12 @@ public class StreamConfigNacosPublisher implements
DynamicConfigPublisher<MiLogS
return;
}
try {
-
configService.publishConfig(CommonExtensionServiceFactory.getCommonExtensionService().getSpaceDataId(spaceId),
DEFAULT_GROUP_ID, gson.toJson(config));
+ String spaceDataId =
CommonExtensionServiceFactory.getCommonExtensionService().getSpaceDataId(spaceId);
+ if (StringUtils.isBlank(spaceDataId)) {
+ log.error("spaceDataId is null,spaceId:{}", spaceId);
+ return;
+ }
+ configService.publishConfig(spaceDataId, DEFAULT_GROUP_ID,
gson.toJson(config));
} catch (NacosException e) {
log.error("Create namespace push data exceptions, parameters:{}",
gson.toJson(config), e);
}
diff --git
a/ozhera-log/log-stream/src/main/java/org/apache/ozhera/log/stream/job/LogDataTransfer.java
b/ozhera-log/log-stream/src/main/java/org/apache/ozhera/log/stream/job/LogDataTransfer.java
index bb31de93..8088df7c 100644
---
a/ozhera-log/log-stream/src/main/java/org/apache/ozhera/log/stream/job/LogDataTransfer.java
+++
b/ozhera-log/log-stream/src/main/java/org/apache/ozhera/log/stream/job/LogDataTransfer.java
@@ -131,7 +131,7 @@ public class LogDataTransfer {
private Map<String, Object> parseMessage(LineMessage lineMessage) {
String ip = lineMessage.getProperties(LineMessage.KEY_IP);
Long lineNumber = lineMessage.getLineNumber();
- Map<String, Object> dataMap =
logParser.parse(lineMessage.getMsgBody(), ip, lineNumber,
lineMessage.getTimestamp(), lineMessage.getFileName());
+ Map<String, Object> dataMap =
logParser.parse(lineMessage.getMsgBody(), ip, lineNumber,
lineMessage.extractTimestamp(), lineMessage.getFileName());
putCommonData(dataMap);
return dataMap;
}
@@ -140,12 +140,6 @@ public class LogDataTransfer {
return objectMapper.readValue(msg, LineMessage.class);
}
- public static void main(String[] args) throws JsonProcessingException {
- String message =
"{\"extMap\":{\"ct\":\"1756780824719\",\"ip\":\"10.7.84.220\",\"tag\":\"tags_392_120935_132193\",\"type\":\"1\"},\"fileName\":\"/home/work/log/nr-promotion-promotion-admin-global-1080348-c-67bbfb479c-27t45/promotion-admin-global/server.log\",\"lineNumber\":2642,\"msgBody\":\"2025-09-02
10:40:21,610|INFO
|4a1378575e03c5d38fa9fd0f7351227a|call_client213|c.x.n.p.a.g.i.u.filter.DubboCommonFilter|dubbo
invoke. service:com.xiaomi.nr.promotion.admin.global.impl.DubboHea [...]
- LineMessage lineMessage = new ObjectMapper().readValue(message,
LineMessage.class);
- System.out.println(lineMessage.getTimestamp());
- }
-
private void putCommonData(Map<String, Object> dataMap) {
dataMap.putIfAbsent(LOG_STREAM_SPACE_ID,
sinkJobConfig.getLogSpaceId());
dataMap.putIfAbsent(LOG_STREAM_STORE_ID,
sinkJobConfig.getLogStoreId());
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]