This is an automated email from the ASF dual-hosted git repository.
gaoxihui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozhera.git
The following commit(s) were added to refs/heads/master by this push:
new fc39363a refactor: log monitoring modification and content parsing
optimization (#561)
fc39363a is described below
commit fc39363a941a23355e95622d9feb9817ab6cdbf9
Author: wtt <[email protected]>
AuthorDate: Wed Mar 12 18:58:16 2025 +0800
refactor: log monitoring modification and content parsing optimization
(#561)
* refactor(log): fefactoring log parsing logic
* refactor: reconstruct file monitoring logic and optimize log parsing
function
* refactor: Optimize code comments for EsDataServiceImpl class
* refactor: optimize code comments for EsDataServiceImpl class
---
.../ozhera/log/agent/channel/ChannelEngine.java | 58 ++++---
.../log/agent/channel/ChannelServiceImpl.java | 43 ++---
.../agent/channel/WildcardChannelServiceImpl.java | 2 +-
.../ozhera/log/agent/channel/file/FileMonitor.java | 2 +-
.../agent/channel/file/InodeFileComparator.java | 42 +++--
.../channel/file/LogFileAlterationObserver.java | 49 ++++--
.../listener/DefaultFileMonitorListener.java | 56 +++---
.../channel/listener/FileMonitorListener.java | 6 +-
.../log/agent/channel/memory/ChannelMemory.java | 2 +-
.../ozhera/log/agent/common/ChannelUtil.java | 2 +-
.../ozhera/log/agent/common/ExecutorUtil.java | 4 +-
.../apache/ozhera/log/agent/FilterMonitorTest.java | 10 +-
.../org/apache/ozhera/log/model/LogtailConfig.java | 3 +
.../org/apache/ozhera/log/model/SinkConfig.java | 1 +
.../apache/ozhera/log/parse/AbstractLogParser.java | 14 +-
.../apache/ozhera/log/parse/CustomLogParser.java | 10 +-
.../org/apache/ozhera/log/parse/JsonLogParser.java | 32 ++--
.../org/apache/ozhera/log/parse/LogParser.java | 8 +-
.../apache/ozhera/log/parse/LogParserFactory.java | 8 +-
.../ozhera/log/parse/SeparatorLogParser.java | 7 +-
.../org/apache/ozhera/log/utils/DateUtils.java | 188 ++++++++++++++-------
.../apache/ozhera/log/common/LogParserTest.java | 39 +++--
.../manager/service/impl/EsDataServiceImpl.java | 107 ++++++++++--
.../log/stream/config/MilogConfigListener.java | 11 +-
.../apache/ozhera/log/stream/job/JobManager.java | 8 +-
.../ozhera/log/stream/job/LogDataTransfer.java | 13 +-
.../ozhera/log/stream/plugin/es/EsPlugin.java | 2 +-
27 files changed, 477 insertions(+), 250 deletions(-)
diff --git
a/ozhera-log/log-agent/src/main/java/org/apache/ozhera/log/agent/channel/ChannelEngine.java
b/ozhera-log/log-agent/src/main/java/org/apache/ozhera/log/agent/channel/ChannelEngine.java
index a860f1c7..c6312672 100644
---
a/ozhera-log/log-agent/src/main/java/org/apache/ozhera/log/agent/channel/ChannelEngine.java
+++
b/ozhera-log/log-agent/src/main/java/org/apache/ozhera/log/agent/channel/ChannelEngine.java
@@ -18,6 +18,7 @@
*/
package org.apache.ozhera.log.agent.channel;
+import cn.hutool.core.io.FileUtil;
import cn.hutool.core.lang.Pair;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
@@ -26,6 +27,14 @@ import com.xiaomi.data.push.common.SafeRun;
import com.xiaomi.data.push.rpc.RpcClient;
import com.xiaomi.data.push.rpc.protocol.RemotingCommand;
import com.xiaomi.mone.file.ILogFile;
+import com.xiaomi.youpin.docean.Ioc;
+import com.xiaomi.youpin.docean.anno.Lookup;
+import com.xiaomi.youpin.docean.anno.Service;
+import com.xiaomi.youpin.docean.plugin.config.Config;
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.commons.lang3.StringUtils;
import org.apache.ozhera.log.agent.channel.comparator.*;
import org.apache.ozhera.log.agent.channel.listener.DefaultFileMonitorListener;
import org.apache.ozhera.log.agent.channel.listener.FileMonitorListener;
@@ -45,14 +54,6 @@ import org.apache.ozhera.log.api.enums.OperateEnum;
import org.apache.ozhera.log.api.model.vo.UpdateLogProcessCmd;
import org.apache.ozhera.log.common.Constant;
import org.apache.ozhera.log.utils.NetUtil;
-import com.xiaomi.youpin.docean.Ioc;
-import com.xiaomi.youpin.docean.anno.Lookup;
-import com.xiaomi.youpin.docean.anno.Service;
-import com.xiaomi.youpin.docean.plugin.config.Config;
-import lombok.Getter;
-import lombok.extern.slf4j.Slf4j;
-import org.apache.commons.collections.CollectionUtils;
-import org.apache.commons.lang3.StringUtils;
import java.text.NumberFormat;
import java.util.*;
@@ -63,6 +64,7 @@ import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import static org.apache.ozhera.log.common.Constant.GSON;
+import static org.apache.ozhera.log.common.Constant.SYMBOL_COMMA;
import static org.apache.ozhera.log.common.PathUtils.PATH_WILDCARD;
import static org.apache.ozhera.log.common.PathUtils.SEPARATOR;
@@ -214,7 +216,7 @@ public class ChannelEngine {
log.info("realChannelService,id:{}", channelId);
try {
channelService.start();
- fileMonitorListener.addChannelService(channelService);
+ fileMonitorListener.addChannelService(abstractChannelService);
successChannelIds.add(channelId);
} catch (RejectedExecutionException e) {
log.error("The thread pool is full.id:{}", channelId, e);
@@ -266,21 +268,32 @@ public class ChannelEngine {
if (null == agentMemoryService) {
agentMemoryService = new
AgentMemoryServiceImpl(org.apache.ozhera.log.common.Config.ins().get("agent.memory.path",
AgentMemoryService.DEFAULT_BASE_PATH));
}
- ChannelService channelService;
- Input input = channelDefine.getInput();
- boolean matchWildcard =
Arrays.stream(input.getLogPattern().split(",")).anyMatch(data ->
StringUtils.substringAfterLast(data, SEPARATOR).contains(PATH_WILDCARD));
- if (matchWildcard) {
- channelService = new WildcardChannelServiceImpl(exporter,
agentMemoryService, channelDefine, filterChain, memoryBasePath);
- } else {
- channelService = new ChannelServiceImpl(exporter,
agentMemoryService, channelDefine, filterChain);
- }
- return channelService;
+ return createChannelService(channelDefine, exporter, filterChain);
} catch (Throwable e) {
- log.error("channelServiceTrans exception, channelDefine:{},
exception:{}", gson.toJson(channelDefine), e);
+ log.error("channelServiceTrans exception, channelDefine:{}",
gson.toJson(channelDefine), e);
}
return null;
}
+ private ChannelService createChannelService(ChannelDefine channelDefine,
MsgExporter exporter, FilterChain filterChain) {
+ Input input = channelDefine.getInput();
+ String logType = channelDefine.getInput().getType();
+ boolean containsWildcard =
isWildcardAllowedForLogType(input.getLogPattern(), logType);
+ if (containsWildcard || FileUtil.exist(input.getLogPattern())) {
+ return new ChannelServiceImpl(exporter, agentMemoryService,
channelDefine, filterChain);
+ } else {
+ return new WildcardChannelServiceImpl(exporter,
agentMemoryService, channelDefine, filterChain, memoryBasePath);
+ }
+ }
+
+ private boolean isWildcardAllowedForLogType(String logPattern, String
logType) {
+ if (LogTypeEnum.OPENTELEMETRY == LogTypeEnum.name2enum(logType)) {
+ return true;
+ }
+ return Arrays.stream(logPattern.split(SYMBOL_COMMA))
+ .noneMatch(data -> StringUtils.substringAfterLast(data,
SEPARATOR).contains(PATH_WILDCARD));
+ }
+
private void preCheckChannelDefine(ChannelDefine channelDefine) {
Preconditions.checkArgument(null != channelDefine, "channelDefine can
not be null");
Preconditions.checkArgument(null != channelDefine.getInput(),
"channelDefine.input can not be null");
@@ -434,14 +447,15 @@ public class ChannelEngine {
try {
if (directDel || CollectionUtils.isNotEmpty(channelDels)) {
log.info("[delete config]data:{}", gson.toJson(channelDels));
- List<Long> channelIdDels =
channelDels.stream().map(ChannelDefine::getChannelId).collect(Collectors.toList());
+ List<Long> channelIdDels =
channelDels.stream().map(ChannelDefine::getChannelId).toList();
List<ChannelService> tempChannelServiceList =
Lists.newArrayList();
channelServiceList.forEach(channelService -> {
- Long channelId = ((AbstractChannelService)
channelService).getChannelDefine().getChannelId();
+ AbstractChannelService abstractChannelService =
(AbstractChannelService) channelService;
+ Long channelId =
abstractChannelService.getChannelDefine().getChannelId();
if (channelIdDels.contains(channelId)) {
log.info("[delete config]channelService:{}",
channelId);
channelService.close();
-
fileMonitorListener.removeChannelService(channelService);
+
fileMonitorListener.removeChannelService(abstractChannelService);
tempChannelServiceList.add(channelService);
this.channelDefineList.removeIf(channelDefine -> {
if
(channelDefine.getChannelId().equals(channelId)) {
diff --git
a/ozhera-log/log-agent/src/main/java/org/apache/ozhera/log/agent/channel/ChannelServiceImpl.java
b/ozhera-log/log-agent/src/main/java/org/apache/ozhera/log/agent/channel/ChannelServiceImpl.java
index bcf8c3aa..3d976ddd 100644
---
a/ozhera-log/log-agent/src/main/java/org/apache/ozhera/log/agent/channel/ChannelServiceImpl.java
+++
b/ozhera-log/log-agent/src/main/java/org/apache/ozhera/log/agent/channel/ChannelServiceImpl.java
@@ -42,7 +42,6 @@ import org.apache.ozhera.log.api.enums.K8sPodTypeEnum;
import org.apache.ozhera.log.api.enums.LogTypeEnum;
import org.apache.ozhera.log.api.model.meta.FilterConf;
import org.apache.ozhera.log.api.model.msg.LineMessage;
-import org.apache.ozhera.log.common.Config;
import org.apache.ozhera.log.common.Constant;
import org.apache.ozhera.log.common.PathUtils;
import org.apache.ozhera.log.utils.NetUtil;
@@ -66,10 +65,11 @@ import static
org.apache.ozhera.log.common.PathUtils.SEPARATOR;
@Slf4j
public class ChannelServiceImpl extends AbstractChannelService {
- private AgentMemoryService memoryService;
+ private final AgentMemoryService memoryService;
private MsgExporter msgExporter;
+ @Getter
private ChannelDefine channelDefine;
private ChannelMemory channelMemory;
@@ -80,7 +80,7 @@ public class ChannelServiceImpl extends
AbstractChannelService {
@Getter
private final ConcurrentHashMap<String, Future> futureMap = new
ConcurrentHashMap<>();
- private Set<String> delFileCollList = new CopyOnWriteArraySet<>();
+ private final Set<String> delFileCollList = new CopyOnWriteArraySet<>();
private final Map<String, Long> reOpenMap = new HashMap<>();
private final Map<String, Long> fileReadMap = new ConcurrentHashMap<>();
@@ -89,13 +89,13 @@ public class ChannelServiceImpl extends
AbstractChannelService {
private ScheduledFuture<?> lastFileLineScheduledFuture;
- private Gson gson = Constant.GSON;
+ private final Gson gson = Constant.GSON;
- private List<LineMessage> lineMessageList = new ArrayList<>();
+ private final List<LineMessage> lineMessageList = new ArrayList<>();
- private ReentrantLock fileColLock = new ReentrantLock();
+ private final ReentrantLock fileColLock = new ReentrantLock();
- private ReentrantLock fileReopenLock = new ReentrantLock();
+ private final ReentrantLock fileReopenLock = new ReentrantLock();
private volatile long lastSendTime = System.currentTimeMillis();
@@ -108,19 +108,15 @@ public class ChannelServiceImpl extends
AbstractChannelService {
*/
private boolean collectOnce;
- private FilterChain chain;
+ private final FilterChain chain;
/**
* The file path to monitor
*/
- private List<MonitorFile> monitorFileList;
+ private final List<MonitorFile> monitorFileList;
private LogTypeEnum logTypeEnum;
- private String logPattern;
-
- private String logSplitExpress;
-
private String linePrefix;
public ChannelServiceImpl(MsgExporter msgExporter, AgentMemoryService
memoryService, ChannelDefine channelDefine, FilterChain chain) {
@@ -171,8 +167,8 @@ public class ChannelServiceImpl extends
AbstractChannelService {
Long channelId = channelDefine.getChannelId();
Input input = channelDefine.getInput();
- this.logPattern = input.getLogPattern();
- this.logSplitExpress = input.getLogSplitExpress();
+ String logPattern = input.getLogPattern();
+ String logSplitExpress = input.getLogSplitExpress();
this.linePrefix = input.getLinePrefix();
String logType = channelDefine.getInput().getType();
@@ -312,9 +308,7 @@ public class ChannelServiceImpl extends
AbstractChannelService {
return;
}
long ct = System.currentTimeMillis();
- readResult.get().getLines().stream()
- .filter(l ->
!shouldFilterLogs(channelDefine.getFilterLogLevelList(), l))
- .forEach(l -> {
+ readResult.get().getLines().stream().forEach(l -> {
String logType = channelDefine.getInput().getType();
LogTypeEnum logTypeEnum = LogTypeEnum.name2enum(logType);
// Multi-line application log type and opentelemetry type are
used to determine the exception stack
@@ -354,7 +348,7 @@ public class ChannelServiceImpl extends
AbstractChannelService {
try {
String remainMsg = mLog.takeRemainMsg2();
if (null != remainMsg) {
- log.info("start send last
line,pattern:{},patternCode:{},data:{}", pattern, patternCode, remainMsg);
+ log.info("start send last
line,pattern:{},patternCode:{},ip:{},data:{}", pattern, patternCode,
getTailPodIp(pattern), remainMsg);
wrapDataToSend(remainMsg,
referenceEntry.getValue().getValue(), pattern, patternCode,
getTailPodIp(pattern), appendTime);
}
} finally {
@@ -441,7 +435,7 @@ public class ChannelServiceImpl extends
AbstractChannelService {
}
if (!collectOnce && !snapshot.isEmpty()) {
- String jsonMap = gson.toJson(snapshot);
+ String jsonMap = gson.toJson(snapshot.keySet());
log.info("fileProgressMap: {}", jsonMap);
}
}
@@ -505,7 +499,7 @@ public class ChannelServiceImpl extends
AbstractChannelService {
@Override
public void close() {
- log.info("Delete the current collection task,channelId:{}",
getChannelId());
+ log.info("Delete the current collection
task,channelId:{},logPattern:{}", getChannelId(),
getChannelDefine().getInput().getLogPattern());
//1.Stop log capture
for (Map.Entry<String, ILogFile> fileEntry : logFileMap.entrySet()) {
fileEntry.getValue().setStop(true);
@@ -525,7 +519,7 @@ public class ChannelServiceImpl extends
AbstractChannelService {
for (Future future : futureMap.values()) {
future.cancel(false);
}
- log.info("stop file monitor,fileName:",
logFileMap.keySet().stream().collect(Collectors.joining(SYMBOL_COMMA)));
+ log.info("stop file monitor,fileName:{}", String.join(SYMBOL_COMMA,
logFileMap.keySet()));
lineMessageList.clear();
reOpenMap.clear();
fileReadMap.clear();
@@ -615,11 +609,6 @@ public class ChannelServiceImpl extends
AbstractChannelService {
return monitorFileList;
}
- @Override
- public ChannelDefine getChannelDefine() {
- return channelDefine;
- }
-
public ChannelMemory getChannelMemory() {
return channelMemory;
}
diff --git
a/ozhera-log/log-agent/src/main/java/org/apache/ozhera/log/agent/channel/WildcardChannelServiceImpl.java
b/ozhera-log/log-agent/src/main/java/org/apache/ozhera/log/agent/channel/WildcardChannelServiceImpl.java
index b9ad7963..2db94d8f 100644
---
a/ozhera-log/log-agent/src/main/java/org/apache/ozhera/log/agent/channel/WildcardChannelServiceImpl.java
+++
b/ozhera-log/log-agent/src/main/java/org/apache/ozhera/log/agent/channel/WildcardChannelServiceImpl.java
@@ -222,7 +222,7 @@ public class WildcardChannelServiceImpl extends
AbstractChannelService {
.distinct()
.toList();
return expressions.size() == 1 ?
- expressions.get(0) :
+ expressions.getFirst() :
expressions.stream().collect(Collectors.joining("|",
MULTI_FILE_PREFIX, MULTI_FILE_SUFFIX));
}
diff --git
a/ozhera-log/log-agent/src/main/java/org/apache/ozhera/log/agent/channel/file/FileMonitor.java
b/ozhera-log/log-agent/src/main/java/org/apache/ozhera/log/agent/channel/file/FileMonitor.java
index f6b31c76..8af23de3 100644
---
a/ozhera-log/log-agent/src/main/java/org/apache/ozhera/log/agent/channel/file/FileMonitor.java
+++
b/ozhera-log/log-agent/src/main/java/org/apache/ozhera/log/agent/channel/file/FileMonitor.java
@@ -42,7 +42,7 @@ public class FileMonitor implements FileWatcher {
FileAlterationMonitor monitor = new FileAlterationMonitor(10000);
log.info("agent monitor files:{}", GSON.toJson(watchList));
for (String watch : watchList) {
- FileAlterationObserver observer = new
LogFileAlterationObserver(new File(watch));
+ FileAlterationObserver observer = new
LogFileAlterationObserver(new File(watch), File::exists);
observer.addListener(new FileListener(consumer));
log.info("## agent monitor file:{}, filePattern:{}", watch,
filePattern);
monitor.addObserver(observer);
diff --git
a/ozhera-log/log-agent/src/main/java/org/apache/ozhera/log/agent/channel/file/InodeFileComparator.java
b/ozhera-log/log-agent/src/main/java/org/apache/ozhera/log/agent/channel/file/InodeFileComparator.java
index 117cbb53..7713c4a4 100644
---
a/ozhera-log/log-agent/src/main/java/org/apache/ozhera/log/agent/channel/file/InodeFileComparator.java
+++
b/ozhera-log/log-agent/src/main/java/org/apache/ozhera/log/agent/channel/file/InodeFileComparator.java
@@ -19,11 +19,11 @@
package org.apache.ozhera.log.agent.channel.file;
import com.google.common.collect.Lists;
-import org.apache.ozhera.log.agent.channel.memory.ChannelMemory;
-import org.apache.ozhera.log.agent.common.ChannelUtil;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.io.comparator.DefaultFileComparator;
import org.apache.commons.io.comparator.NameFileComparator;
+import org.apache.ozhera.log.agent.channel.memory.ChannelMemory;
+import org.apache.ozhera.log.agent.common.ChannelUtil;
import java.io.File;
import java.util.*;
@@ -49,33 +49,41 @@ public class InodeFileComparator extends
DefaultFileComparator {
@Override
public int compare(File file1, File file2) {
+// log.info("InodeFileComparator compare
file1:{},file2:{},filePaths:{}", file1, file2, GSON.toJson(filePaths));
int sort = fileComparator.compare(file1, file2);
- if (file1.isDirectory() || file2.isDirectory()) {
- return sort;
- }
- if (sort == 0 && filePaths.contains(file1.getAbsolutePath())) {
- //The file name is the same
- Long oldInode;
- if (INODE_MAP.containsKey(file1.getAbsolutePath())) {
- oldInode = INODE_MAP.get(file1.getAbsolutePath());
- } else {
- oldInode =
ChannelUtil.buildUnixFileNode(file1.getAbsolutePath()).getSt_ino();
- INODE_MAP.put(file1.getAbsolutePath(), oldInode);
+ try {
+ if (file1.isDirectory() || file2.isDirectory()) {
+ return sort;
}
- ChannelMemory.UnixFileNode unixFileNode2 =
ChannelUtil.buildUnixFileNode(file2.getAbsolutePath());
- if (!Objects.equals(oldInode, unixFileNode2.getSt_ino())) {
- INODE_MAP.put(file2.getAbsolutePath(),
unixFileNode2.getSt_ino());
- return 1;
+ if (sort == 0 && filePaths.contains(file1.getAbsolutePath())) {
+ //The file name is the same
+// log.info("INODE_MAP:{}", GSON.toJson(INODE_MAP));
+ Long oldInode;
+ if (INODE_MAP.containsKey(file1.getAbsolutePath())) {
+ oldInode = INODE_MAP.get(file1.getAbsolutePath());
+ } else {
+ oldInode =
ChannelUtil.buildUnixFileNode(file1.getAbsolutePath()).getSt_ino();
+ INODE_MAP.put(file1.getAbsolutePath(), oldInode);
+ }
+ ChannelMemory.UnixFileNode unixFileNode2 =
ChannelUtil.buildUnixFileNode(file2.getAbsolutePath());
+ if (!Objects.equals(oldInode, unixFileNode2.getSt_ino())) {
+ INODE_MAP.put(file2.getAbsolutePath(),
unixFileNode2.getSt_ino());
+ return 1;
+ }
}
+ } catch (Exception e) {
+ log.error("InodeFileComparator compare error,file1:{},file2:{}",
file1, file2, e);
}
return sort;
}
public static void addFile(String filePath) {
+ log.info("InodeFileComparator add file : {}", filePath);
filePaths.add(filePath);
}
public static void removeFile(String filePath) {
+ log.info("InodeFileComparator remove file : {}", filePath);
filePaths.remove(filePath);
}
}
diff --git
a/ozhera-log/log-agent/src/main/java/org/apache/ozhera/log/agent/channel/file/LogFileAlterationObserver.java
b/ozhera-log/log-agent/src/main/java/org/apache/ozhera/log/agent/channel/file/LogFileAlterationObserver.java
index 42d5a0bb..5fda5b01 100644
---
a/ozhera-log/log-agent/src/main/java/org/apache/ozhera/log/agent/channel/file/LogFileAlterationObserver.java
+++
b/ozhera-log/log-agent/src/main/java/org/apache/ozhera/log/agent/channel/file/LogFileAlterationObserver.java
@@ -84,26 +84,31 @@ public class LogFileAlterationObserver extends
FileAlterationObserver {
@Override
public void checkAndNotify() {
+ try {
+ /* fire onStart() */
+ for (FileAlterationListener listener : getListeners()) {
+ listener.onStart(this);
+ }
- /* fire onStart() */
- for (FileAlterationListener listener : getListeners()) {
- listener.onStart(this);
- }
-
- /* fire directory/file events */
- File rootFile = getDirectory();
- if (null == childRootEntry) {
- childRootEntry = (FileEntry) ReflectUtil.getFieldValue(this,
"rootEntry");
- }
- if (rootFile.exists()) {
- checkAndNotify(childRootEntry, childRootEntry.getChildren(),
listFiles(rootFile));
- } else if (childRootEntry.isExists()) {
- checkAndNotify(childRootEntry, childRootEntry.getChildren(),
FileUtils.EMPTY_FILE_ARRAY);
- }
+ /* fire directory/file events */
+ File rootFile = getDirectory();
+ if (null == childRootEntry) {
+ childRootEntry = (FileEntry) ReflectUtil.getFieldValue(this,
"rootEntry");
+ }
+ if (rootFile.exists()) {
+// log.info("LogFileAlterationObserver checkAndNotify
rootFile:{}", rootFile.getAbsolutePath());
+ checkAndNotify(childRootEntry, childRootEntry.getChildren(),
listFiles(rootFile));
+ } else if (childRootEntry.isExists()) {
+// log.info("LogFileAlterationObserver checkAndNotify
rootFile:{}", childRootEntry.getFile().getAbsolutePath());
+ checkAndNotify(childRootEntry, childRootEntry.getChildren(),
FileUtils.EMPTY_FILE_ARRAY);
+ }
- /* fire onStop() */
- for (FileAlterationListener listener : getListeners()) {
- listener.onStop(this);
+ /* fire onStop() */
+ for (FileAlterationListener listener : getListeners()) {
+ listener.onStop(this);
+ }
+ } catch (Exception e) {
+ log.error("LogFileAlterationObserver checkAndNotify", e);
}
}
@@ -112,6 +117,9 @@ public class LogFileAlterationObserver extends
FileAlterationObserver {
if (null == childComparator) {
childComparator = (Comparator) ReflectUtil.getFieldValue(this,
"comparator");
}
+// List<File> oldFileList =
Arrays.stream(previous).map(FileEntry::getFile).collect(Collectors.toList());
+// List<String> newFileList =
Arrays.stream(files).map(File::getAbsolutePath).collect(Collectors.toList());
+// log.info("childComparator:{},previous:{},files:{}", childComparator,
GSON.toJson(oldFileList), GSON.toJson(newFileList));
FileEntry[] current = files.length > 0 ? new FileEntry[files.length] :
EMPTY_ENTRIES;
for (FileEntry entry : previous) {
while (c < files.length &&
childComparator.compare(entry.getFile(), files[c]) > 0) {
@@ -120,6 +128,7 @@ public class LogFileAlterationObserver extends
FileAlterationObserver {
c++;
}
if (c < files.length && childComparator.compare(entry.getFile(),
files[c]) == 0) {
+ doMatch(entry, files[c]);
checkAndNotify(entry, entry.getChildren(),
listFiles(files[c]));
current[c] = entry;
c++;
@@ -134,6 +143,10 @@ public class LogFileAlterationObserver extends
FileAlterationObserver {
parent.setChildren(current);
}
+ private void doMatch(final FileEntry entry, final File file) {
+ entry.refresh(file);
+ }
+
private File[] listFiles(File file) {
File[] children = null;
if (file.isDirectory()) {
diff --git
a/ozhera-log/log-agent/src/main/java/org/apache/ozhera/log/agent/channel/listener/DefaultFileMonitorListener.java
b/ozhera-log/log-agent/src/main/java/org/apache/ozhera/log/agent/channel/listener/DefaultFileMonitorListener.java
index ab56b129..c8a5cc60 100644
---
a/ozhera-log/log-agent/src/main/java/org/apache/ozhera/log/agent/channel/listener/DefaultFileMonitorListener.java
+++
b/ozhera-log/log-agent/src/main/java/org/apache/ozhera/log/agent/channel/listener/DefaultFileMonitorListener.java
@@ -22,23 +22,22 @@ import cn.hutool.core.io.FileUtil;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import com.google.gson.Gson;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.commons.io.monitor.FileAlterationMonitor;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.ozhera.log.agent.channel.AbstractChannelService;
import org.apache.ozhera.log.agent.channel.ChannelService;
+import org.apache.ozhera.log.agent.channel.WildcardChannelServiceImpl;
import org.apache.ozhera.log.agent.channel.file.FileMonitor;
import org.apache.ozhera.log.agent.channel.file.MonitorFile;
import org.apache.ozhera.log.agent.common.ChannelUtil;
import org.apache.ozhera.log.agent.common.ExecutorUtil;
import org.apache.ozhera.log.api.enums.LogTypeEnum;
import org.apache.ozhera.log.common.PathUtils;
-import lombok.extern.slf4j.Slf4j;
-import org.apache.commons.collections.CollectionUtils;
-import org.apache.commons.io.monitor.FileAlterationMonitor;
-import org.apache.commons.lang3.StringUtils;
import java.io.File;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
+import java.util.*;
import java.util.concurrent.*;
import java.util.stream.Collectors;
@@ -65,9 +64,9 @@ public class DefaultFileMonitorListener implements
FileMonitorListener {
*/
List<String> pathList = new CopyOnWriteArrayList<>();
/**
- * Actual listener list
+ * Actual listener map
*/
- private List<FileAlterationMonitor> monitorList = new
CopyOnWriteArrayList();
+ private final Map<Long, List<FileAlterationMonitor>> monitorMap = new
HashMap<>();
/**
* Each listening thread
*/
@@ -75,11 +74,13 @@ public class DefaultFileMonitorListener implements
FileMonitorListener {
/**
* Each ChannelService corresponds to the monitored file
*/
- Map<List<MonitorFile>, ChannelService> pathChannelServiceMap = new
ConcurrentHashMap<>();
+ private Map<List<MonitorFile>, ChannelService> pathChannelServiceMap = new
ConcurrentHashMap<>();
private final List<String> specialFileNameSuffixList =
Lists.newArrayList("wf");
- private static final int DEFAULT_FILE_SIZE = 100000;
+ private static final int DEFAULT_FILE_SIZE = 9000;
+
+ private static final long DEFAULT_CHANNEL_ID = 0L;
public DefaultFileMonitorListener() {
//Check if there are too many files, if there are more than 50,000
files,
@@ -87,7 +88,8 @@ public class DefaultFileMonitorListener implements
FileMonitorListener {
long size = getDefaultFileSize();
log.info("defaultMonitorPath:{} file size:{}", defaultMonitorPath,
size);
if (size < DEFAULT_FILE_SIZE) {
- this.startFileMonitor(defaultMonitorPath);
+ monitorMap.putIfAbsent(DEFAULT_CHANNEL_ID, new ArrayList<>());
+ this.startFileMonitor(DEFAULT_CHANNEL_ID, defaultMonitorPath);
pathList.add(defaultMonitorPath);
}
}
@@ -110,15 +112,17 @@ public class DefaultFileMonitorListener implements
FileMonitorListener {
}
@Override
- public void addChannelService(ChannelService channelService) {
+ public void addChannelService(AbstractChannelService channelService) {
List<MonitorFile> monitorPathList =
channelService.getMonitorPathList();
if (CollectionUtils.isEmpty(monitorPathList)) {
return;
}
+ Long channelId = channelService.getChannelDefine().getChannelId();
+ monitorMap.putIfAbsent(channelId, new ArrayList<>());
List<String> newMonitorDirectories =
newMonitorDirectories(monitorPathList);
for (String watchDirectory : newMonitorDirectories) {
if (isValidWatch(watchDirectory)) {
- startFileMonitor(watchDirectory);
+ startFileMonitor(channelId, watchDirectory);
pathList.add(watchDirectory);
}
}
@@ -171,35 +175,46 @@ public class DefaultFileMonitorListener implements
FileMonitorListener {
}
}
newMonitorDirectories =
newMonitorDirectories.stream().distinct().collect(Collectors.toList());
- log.info("end newMonitorDirectories:",
gson.toJson(newMonitorDirectories));
+ log.info("end newMonitorDirectories:{}",
gson.toJson(newMonitorDirectories));
return newMonitorDirectories;
}
@Override
- public void removeChannelService(ChannelService channelService) {
+ public void removeChannelService(AbstractChannelService channelService) {
try {
+ if (channelService instanceof WildcardChannelServiceImpl) {
+ return;
+ }
pathChannelServiceMap.remove(channelService.getMonitorPathList());
List<MonitorFile> monitorPathList =
channelService.getMonitorPathList();
List<String> newMonitorDirectories =
newMonitorDirectories(monitorPathList);
for (String watchDirectory : newMonitorDirectories) {
+ log.info("remove watchDirectory:{}", watchDirectory);
pathList.remove(watchDirectory);
if (scheduledFutureMap.containsKey(watchDirectory)) {
scheduledFutureMap.get(watchDirectory).cancel(true);
}
}
+ List<FileAlterationMonitor> fileAlterationMonitors =
monitorMap.get(channelService.getChannelDefine().getChannelId());
+ if (CollectionUtils.isNotEmpty(fileAlterationMonitors)) {
+ for (FileAlterationMonitor fileAlterationMonitor :
fileAlterationMonitors) {
+ fileAlterationMonitor.stop();
+ }
+ }
} catch (Exception e) {
log.error("removeChannelService file listener,monitorPathList:{}",
gson.toJson(channelService.getMonitorPathList()), e);
}
}
- public void startFileMonitor(String monitorFilePath) {
+ public void startFileMonitor(Long channelId, String monitorFilePath) {
log.debug("startFileMonitor,monitorFilePath:{}", monitorFilePath);
if (pathList.stream().anyMatch(monitorFilePath::startsWith)) {
log.info("current path has
started,monitorFilePath:{},pathList:{}", monitorFilePath,
String.join(SYMBOL_COMMA, pathList));
return;
}
+ List<FileAlterationMonitor> fileAlterationMonitors =
monitorMap.get(channelId);
Future<?> fileMonitorFuture = ExecutorUtil.submit(() -> {
- new FileMonitor().watch(monitorFilePath, monitorList,
changedFilePath -> {
+ new FileMonitor().watch(monitorFilePath, fileAlterationMonitors,
changedFilePath -> {
try {
if (FileUtil.isDirectory(changedFilePath)) {
return;
@@ -215,14 +230,13 @@ public class DefaultFileMonitorListener implements
FileMonitorListener {
log.error("FileMonitor
error,monitorFilePath:{},changedFilePath:{}", monitorFilePath, changedFilePath,
e);
}
});
+ monitorMap.put(channelId, fileAlterationMonitors);
});
scheduledFutureMap.put(monitorFilePath, fileMonitorFuture);
}
/**
* Normal file change event handling
- *
- * @param changedFilePath
*/
private void ordinaryFileChanged(String changedFilePath) {
for (Map.Entry<List<MonitorFile>, ChannelService> channelServiceEntry
: pathChannelServiceMap.entrySet()) {
diff --git
a/ozhera-log/log-agent/src/main/java/org/apache/ozhera/log/agent/channel/listener/FileMonitorListener.java
b/ozhera-log/log-agent/src/main/java/org/apache/ozhera/log/agent/channel/listener/FileMonitorListener.java
index ea8726ac..4512342f 100644
---
a/ozhera-log/log-agent/src/main/java/org/apache/ozhera/log/agent/channel/listener/FileMonitorListener.java
+++
b/ozhera-log/log-agent/src/main/java/org/apache/ozhera/log/agent/channel/listener/FileMonitorListener.java
@@ -18,7 +18,7 @@
*/
package org.apache.ozhera.log.agent.channel.listener;
-import org.apache.ozhera.log.agent.channel.ChannelService;
+import org.apache.ozhera.log.agent.channel.AbstractChannelService;
/**
* @author wtt
@@ -30,10 +30,10 @@ public interface FileMonitorListener {
/**
* Add
*/
- void addChannelService(ChannelService channelService);
+ void addChannelService(AbstractChannelService channelService);
/**
* Delete
*/
- void removeChannelService(ChannelService channelService);
+ void removeChannelService(AbstractChannelService channelService);
}
diff --git
a/ozhera-log/log-agent/src/main/java/org/apache/ozhera/log/agent/channel/memory/ChannelMemory.java
b/ozhera-log/log-agent/src/main/java/org/apache/ozhera/log/agent/channel/memory/ChannelMemory.java
index 0a5523b1..b8c409ec 100644
---
a/ozhera-log/log-agent/src/main/java/org/apache/ozhera/log/agent/channel/memory/ChannelMemory.java
+++
b/ozhera-log/log-agent/src/main/java/org/apache/ozhera/log/agent/channel/memory/ChannelMemory.java
@@ -34,7 +34,7 @@ import java.util.Map;
@Data
public class ChannelMemory implements Serializable {
- public transient static final String DEFAULT_VERSION = "2.0";
+ public static final String DEFAULT_VERSION = "2.0";
private Long channelId;
diff --git
a/ozhera-log/log-agent/src/main/java/org/apache/ozhera/log/agent/common/ChannelUtil.java
b/ozhera-log/log-agent/src/main/java/org/apache/ozhera/log/agent/common/ChannelUtil.java
index bce4362a..27a0f1a3 100644
---
a/ozhera-log/log-agent/src/main/java/org/apache/ozhera/log/agent/common/ChannelUtil.java
+++
b/ozhera-log/log-agent/src/main/java/org/apache/ozhera/log/agent/common/ChannelUtil.java
@@ -96,7 +96,7 @@ public class ChannelUtil {
return unixFileNode;
}
} catch (Exception e) {
- log.info("buildUnixFileNode error,filePath:{}", filePath, e);
+ log.error("buildUnixFileNode error,filePath:{}", filePath, e);
}
return new ChannelMemory.UnixFileNode();
}
diff --git
a/ozhera-log/log-agent/src/main/java/org/apache/ozhera/log/agent/common/ExecutorUtil.java
b/ozhera-log/log-agent/src/main/java/org/apache/ozhera/log/agent/common/ExecutorUtil.java
index c7c78ffe..a9e94da3 100644
---
a/ozhera-log/log-agent/src/main/java/org/apache/ozhera/log/agent/common/ExecutorUtil.java
+++
b/ozhera-log/log-agent/src/main/java/org/apache/ozhera/log/agent/common/ExecutorUtil.java
@@ -47,9 +47,7 @@ public class ExecutorUtil {
public static ExecutorService createPool() {
System.setProperty("jdk.virtualThreadScheduler.parallelism",
String.valueOf(Runtime.getRuntime().availableProcessors() + 1));
ThreadFactory factory =
Thread.ofVirtual().name("ExecutorUtil-TP-Virtual-Thread", 0)
- .uncaughtExceptionHandler((t, e) -> {
- log.error("ExecutorUtil-TP-Virtual-Thread
uncaughtException:{}", e.getMessage(), e);
- }).factory();
+ .uncaughtExceptionHandler((t, e) ->
log.error("ExecutorUtil-TP-Virtual-Thread uncaughtException:{}",
e.getMessage(), e)).factory();
return Executors.newThreadPerTaskExecutor(factory);
}
diff --git
a/ozhera-log/log-agent/src/test/java/org/apache/ozhera/log/agent/FilterMonitorTest.java
b/ozhera-log/log-agent/src/test/java/org/apache/ozhera/log/agent/FilterMonitorTest.java
index 2e61fd89..902d0fcc 100644
---
a/ozhera-log/log-agent/src/test/java/org/apache/ozhera/log/agent/FilterMonitorTest.java
+++
b/ozhera-log/log-agent/src/test/java/org/apache/ozhera/log/agent/FilterMonitorTest.java
@@ -20,10 +20,11 @@ package org.apache.ozhera.log.agent;
import com.google.common.collect.Lists;
import com.google.gson.Gson;
-import org.apache.ozhera.log.agent.channel.file.FileListener;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.io.monitor.FileAlterationMonitor;
import org.apache.commons.io.monitor.FileAlterationObserver;
+import org.apache.ozhera.log.agent.channel.file.FileListener;
+import org.apache.ozhera.log.agent.channel.file.LogFileAlterationObserver;
import org.junit.Test;
import java.io.File;
@@ -74,16 +75,17 @@ public class FilterMonitorTest {
// continue;
// }
- FileAlterationObserver observer = new FileAlterationObserver(new
File(watch));
+// FileAlterationObserver observer = new FileAlterationObserver(new
File(watch));
+ FileAlterationObserver observer = new
LogFileAlterationObserver(new File(watch), file -> file.exists() &&
file.isFile());
observer.addListener(new FileListener(consumer));
- log.info("## agent monitor file:{}, filePattern:{}", watch);
+ log.info("## agent monitor file:{}", watch);
monitor.addObserver(observer);
}
try {
monitor.start();
- log.info("## agent monitor filePattern:{} started");
+ log.info("## agent monitor started");
} catch (Exception e) {
log.error(String.format("agent file monitor start err,monitor
filePattern:%s"), e);
}
diff --git
a/ozhera-log/log-common/src/main/java/org/apache/ozhera/log/model/LogtailConfig.java
b/ozhera-log/log-common/src/main/java/org/apache/ozhera/log/model/LogtailConfig.java
index 5dbf8aea..b25f85ec 100644
---
a/ozhera-log/log-common/src/main/java/org/apache/ozhera/log/model/LogtailConfig.java
+++
b/ozhera-log/log-common/src/main/java/org/apache/ozhera/log/model/LogtailConfig.java
@@ -32,6 +32,7 @@ public class LogtailConfig {
private String ak;
private String sk;
private String clusterInfo;
+ @EqualsAndHashCode.Exclude
private String consumerGroup;
private String topic;
private String tag;
@@ -41,6 +42,8 @@ public class LogtailConfig {
private String deploySpace;
private Integer parseType;
+
+ @EqualsAndHashCode.Exclude
private String tail;
/**
* Log delimiter
diff --git
a/ozhera-log/log-common/src/main/java/org/apache/ozhera/log/model/SinkConfig.java
b/ozhera-log/log-common/src/main/java/org/apache/ozhera/log/model/SinkConfig.java
index ce991a0e..4ca08b0c 100644
---
a/ozhera-log/log-common/src/main/java/org/apache/ozhera/log/model/SinkConfig.java
+++
b/ozhera-log/log-common/src/main/java/org/apache/ozhera/log/model/SinkConfig.java
@@ -30,6 +30,7 @@ import java.util.List;
@EqualsAndHashCode
public class SinkConfig {
private Long logstoreId;
+ @EqualsAndHashCode.Exclude
private String logstoreName;
/**
* timestamp is required
diff --git
a/ozhera-log/log-common/src/main/java/org/apache/ozhera/log/parse/AbstractLogParser.java
b/ozhera-log/log-common/src/main/java/org/apache/ozhera/log/parse/AbstractLogParser.java
index 09acb66c..50ad6a3f 100644
---
a/ozhera-log/log-common/src/main/java/org/apache/ozhera/log/parse/AbstractLogParser.java
+++
b/ozhera-log/log-common/src/main/java/org/apache/ozhera/log/parse/AbstractLogParser.java
@@ -24,6 +24,7 @@ import com.google.common.collect.Lists;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.ozhera.log.utils.IndexUtils;
+import org.apache.ozhera.log.utils.NetUtil;
import java.util.*;
import java.util.function.Function;
@@ -43,6 +44,8 @@ public abstract class AbstractLogParser implements LogParser {
private List<FieldInterceptor> fieldInterceptors = Lists.newArrayList();
+ private String localIp = NetUtil.getLocalIp();
+
protected Map<String, Integer> valueMap;
public AbstractLogParser(LogParserData parserData) {
@@ -88,13 +91,20 @@ public abstract class AbstractLogParser implements
LogParser {
validRet(parseData, logData);
fieldInterceptors.forEach(fieldInterceptor ->
fieldInterceptor.postProcess(parseData));
+ addCommonData(parseData);
+
return parseData;
}
+ private void addCommonData(Map<String, Object> data) {
+ data.putIfAbsent(parse_time, System.currentTimeMillis());
+ data.putIfAbsent(parse_ip, localIp);
+ }
+
@Override
public Map<String, Object> parseSimple(String logData, Long collectStamp) {
Map<String, Object> parseData = doParseSimple(logData, collectStamp);
- fieldInterceptors.stream().forEach(fieldInterceptor ->
fieldInterceptor.postProcess(parseData));
+ fieldInterceptors.forEach(fieldInterceptor ->
fieldInterceptor.postProcess(parseData));
return parseData;
}
@@ -133,7 +143,7 @@ public abstract class AbstractLogParser implements
LogParser {
void wrapMap(Map<String, Object> ret, LogParserData parserData, String ip,
Long lineNum, String fileName, Long collectStamp) {
- ret.putIfAbsent(esKeyMap_timestamp, null == collectStamp ?
getTimestampFromString("", collectStamp) : collectStamp);
+ ret.putIfAbsent(esKeyMap_timestamp, null == collectStamp ?
System.currentTimeMillis() : collectStamp);
ret.put(esKeyMap_topic, parserData.getTopicName());
ret.put(esKeyMap_tag, parserData.getMqTag());
ret.put(esKeyMap_logstoreName, parserData.getLogStoreName());
diff --git
a/ozhera-log/log-common/src/main/java/org/apache/ozhera/log/parse/CustomLogParser.java
b/ozhera-log/log-common/src/main/java/org/apache/ozhera/log/parse/CustomLogParser.java
index a74b51ce..6352e404 100644
---
a/ozhera-log/log-common/src/main/java/org/apache/ozhera/log/parse/CustomLogParser.java
+++
b/ozhera-log/log-common/src/main/java/org/apache/ozhera/log/parse/CustomLogParser.java
@@ -21,10 +21,10 @@ package org.apache.ozhera.log.parse;
import cn.hutool.core.collection.CollectionUtil;
import cn.hutool.core.util.StrUtil;
import com.google.gson.Gson;
-import org.apache.ozhera.log.utils.IndexUtils;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
+import org.apache.ozhera.log.utils.IndexUtils;
import java.util.*;
import java.util.stream.Collectors;
@@ -89,16 +89,10 @@ public class CustomLogParser extends AbstractLogParser {
if
(ret.values().stream().map(String::valueOf).anyMatch(StringUtils::isEmpty)) {
ret.put(ES_KEY_MAP_LOG_SOURCE, originLog);
}
- /**
- * Pocket does not include esKeyMap_timestamp, esKeyMap_topic,
esKeyMap_tag, esKeyMap_logstoreName
- */
- if (ret.containsKey(esKeyMap_timestamp)) {
- Long time =
getTimestampFromString(ret.get(esKeyMap_timestamp).toString(), collectStamp);
- ret.put(esKeyMap_timestamp, time);
- }
} catch (Exception e) {
ret.put(ES_KEY_MAP_LOG_SOURCE, originData);
}
+ validTimestamp(ret, collectStamp);
return ret;
}
diff --git
a/ozhera-log/log-common/src/main/java/org/apache/ozhera/log/parse/JsonLogParser.java
b/ozhera-log/log-common/src/main/java/org/apache/ozhera/log/parse/JsonLogParser.java
index 9e744c11..74b0943f 100644
---
a/ozhera-log/log-common/src/main/java/org/apache/ozhera/log/parse/JsonLogParser.java
+++
b/ozhera-log/log-common/src/main/java/org/apache/ozhera/log/parse/JsonLogParser.java
@@ -61,25 +61,31 @@ public class JsonLogParser extends AbstractLogParser {
try {
// Map<String, Object> rawLogMap = GSON.fromJson(logData,
token.getType());
Map<String, Object> rawLogMap = flattenJson(logData);
- // The complete set of index column names
- List<String> keyNameList =
IndexUtils.getKeyListSlice(parserData.getKeyList());
- // An index subset that marks whether the index column name at the
corresponding location is referenced in the current tail
- int[] valueIndexList =
Arrays.stream(parserData.getValueList().split(",")).mapToInt(Integer::parseInt).toArray();
- for (int i = 0; i < keyNameList.size(); i++) {
- // Skip unreferenced keys
- if (i >= valueIndexList.length || valueIndexList[i] == -1) {
- continue;
+ if (null != valueMap && !valueMap.isEmpty()) {
+ for (String key : valueMap.keySet()) {
+ ret.put(key, rawLogMap.getOrDefault(key, ""));
+ }
+ } else {
+ // The complete set of index column names
+ List<String> keyNameList =
IndexUtils.getKeyListSlice(parserData.getKeyList());
+ // An index subset that marks whether the index column name at
the corresponding location is referenced in the current tail
+ int[] valueIndexList =
Arrays.stream(parserData.getValueList().split(",")).mapToInt(Integer::parseInt).toArray();
+ for (int i = 0; i < keyNameList.size(); i++) {
+ // Skip unreferenced keys
+ if (i >= valueIndexList.length || valueIndexList[i] == -1)
{
+ continue;
+ }
+ String currentKey = keyNameList.get(i);
+ String value = rawLogMap.getOrDefault(currentKey,
"").toString();
+ ret.put(currentKey, StringUtils.isNotEmpty(value) ?
value.trim() : value);
}
- String currentKey = keyNameList.get(i);
- String value = rawLogMap.getOrDefault(currentKey,
"").toString();
- ret.put(currentKey, StringUtils.isNotEmpty(value) ?
value.trim() : value);
}
- //timestamp
- validTimestamp(ret, collectStamp);
} catch (Exception e) {
// If an exception occurs, the original log is kept to the
logsource field
ret.put(ES_KEY_MAP_LOG_SOURCE, logData);
}
+ //timestamp
+ validTimestamp(ret, collectStamp);
return ret;
}
diff --git
a/ozhera-log/log-common/src/main/java/org/apache/ozhera/log/parse/LogParser.java
b/ozhera-log/log-common/src/main/java/org/apache/ozhera/log/parse/LogParser.java
index 946bbdae..3db5b237 100644
---
a/ozhera-log/log-common/src/main/java/org/apache/ozhera/log/parse/LogParser.java
+++
b/ozhera-log/log-common/src/main/java/org/apache/ozhera/log/parse/LogParser.java
@@ -21,10 +21,12 @@ package org.apache.ozhera.log.parse;
import cn.hutool.core.date.DateUtil;
import org.apache.commons.lang3.time.DateParser;
import org.apache.commons.lang3.time.FastDateFormat;
+import org.apache.ozhera.log.utils.DateUtils;
import java.time.Instant;
import java.util.List;
import java.util.Map;
+import java.util.Objects;
/**
* @Author: wtt
@@ -49,6 +51,8 @@ public interface LogParser {
String specialTimePrefix = "20";
String esKeyMap_timestamp = "timestamp";
+ String parse_time = "parse_time";
+ String parse_ip = "parse_ip";
String esKeyMap_Date = "Date";
String esKeyMap_topic = "mqtopic";
String esKeyMap_tag = "mqtag";
@@ -79,11 +83,11 @@ public interface LogParser {
default Long getTimestampFromString(String logTime, Long collectStamp) {
Long timeStamp;
try {
- timeStamp = DateUtil.parse(logTime).getTime();
+ timeStamp =
Objects.requireNonNull(DateUtils.parse(logTime)).getTime();
} catch (Exception e) {
try {
logTime = String.format("%s%s",
String.valueOf(DateUtil.thisYear()).substring(0, 2), logTime);
- timeStamp = DateUtil.parse(logTime).getTime();
+ timeStamp = DateUtils.parse(logTime).getTime();
} catch (Exception ex) {
timeStamp = collectStamp;
}
diff --git
a/ozhera-log/log-common/src/main/java/org/apache/ozhera/log/parse/LogParserFactory.java
b/ozhera-log/log-common/src/main/java/org/apache/ozhera/log/parse/LogParserFactory.java
index 6e4ea33f..0c0a634e 100644
---
a/ozhera-log/log-common/src/main/java/org/apache/ozhera/log/parse/LogParserFactory.java
+++
b/ozhera-log/log-common/src/main/java/org/apache/ozhera/log/parse/LogParserFactory.java
@@ -34,10 +34,6 @@ public class LogParserFactory {
private LogParserFactory() {
}
- public static LogParser getLogParser(Integer parseType, String keyList,
String valueList, String parseScript, String keyOrderList) {
- return LogParserFactory.getLogParser(parseType, keyList, valueList,
parseScript, "", "", "", "", keyOrderList);
- }
-
public static LogParser getLogParser(Integer parseType, String keyList,
String valueList, String parseScript,
String topicName, String tailName,
String mqTag, String logStoreName, String keyOrderList) {
LogParserData logParserData = LogParserData.builder().keyList(keyList)
@@ -68,6 +64,10 @@ public class LogParserFactory {
}
}
+ public static LogParser getLogParser(Integer parseType, String keyList,
String valueList, String parseScript, String keyOrderList) {
+ return LogParserFactory.getLogParser(parseType, keyList, valueList,
parseScript, "", "", "", "", keyOrderList);
+ }
+
@Getter
public enum LogParserEnum {
diff --git
a/ozhera-log/log-common/src/main/java/org/apache/ozhera/log/parse/SeparatorLogParser.java
b/ozhera-log/log-common/src/main/java/org/apache/ozhera/log/parse/SeparatorLogParser.java
index 0baa3a9f..0a630f96 100644
---
a/ozhera-log/log-common/src/main/java/org/apache/ozhera/log/parse/SeparatorLogParser.java
+++
b/ozhera-log/log-common/src/main/java/org/apache/ozhera/log/parse/SeparatorLogParser.java
@@ -20,12 +20,9 @@ package org.apache.ozhera.log.parse;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
-import org.apache.ozhera.log.utils.IndexUtils;
import java.util.*;
-import java.util.function.Function;
import java.util.stream.Collectors;
-import java.util.stream.IntStream;
/**
* @author wtt
@@ -128,8 +125,7 @@ public class SeparatorLogParser extends AbstractLogParser {
value = "";
}
if (kTsplit[0].equals(esKeyMap_timestamp) ||
kTsplit[1].equalsIgnoreCase(esKeyMap_Date)) {
- Long time = getTimestampFromString(value,
collectStamp);
- ret.put(esKeyMap_timestamp, time);
+ ret.put(esKeyMap_timestamp, value);
} else {
ret.put(kTsplit[0], StringUtils.isNotEmpty(value) ?
value.trim() : value);
}
@@ -146,6 +142,7 @@ public class SeparatorLogParser extends AbstractLogParser {
} catch (Exception e) {
ret.put(ES_KEY_MAP_LOG_SOURCE, logData);
}
+ validTimestamp(ret, collectStamp);
return ret;
}
diff --git
a/ozhera-log/log-common/src/main/java/org/apache/ozhera/log/utils/DateUtils.java
b/ozhera-log/log-common/src/main/java/org/apache/ozhera/log/utils/DateUtils.java
index 8dd1019a..d4dcfe27 100644
---
a/ozhera-log/log-common/src/main/java/org/apache/ozhera/log/utils/DateUtils.java
+++
b/ozhera-log/log-common/src/main/java/org/apache/ozhera/log/utils/DateUtils.java
@@ -18,6 +18,16 @@
*/
package org.apache.ozhera.log.utils;
+import cn.hutool.core.date.CalendarUtil;
+import cn.hutool.core.date.DateException;
+import cn.hutool.core.date.DatePattern;
+import cn.hutool.core.date.DateTime;
+import cn.hutool.core.date.format.DateParser;
+import cn.hutool.core.lang.PatternPool;
+import cn.hutool.core.util.CharUtil;
+import cn.hutool.core.util.NumberUtil;
+import cn.hutool.core.util.ReUtil;
+import cn.hutool.core.util.StrUtil;
import org.apache.commons.lang3.StringUtils;
import java.text.ParseException;
@@ -25,18 +35,40 @@ import java.text.SimpleDateFormat;
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
-import java.util.Calendar;
+import java.time.format.DateTimeParseException;
import java.util.Date;
+import java.util.List;
+import java.util.Optional;
+
+import static cn.hutool.core.date.DateUtil.*;
/**
* @author wtt
* @version 1.0
* @description
- * @date 2021/9/29 10:30
+ * @date 2024/12/29 12:40
*/
-public class DateUtils {
+public class DateUtils extends CalendarUtil {
public static long dayms = 86400000L;
+ private final static String[] wtb = {
+ "sun", "mon", "tue", "wed", "thu", "fri", "sat",
+ "jan", "feb", "mar", "apr", "may", "jun", "jul", "aug", "sep",
"oct", "nov", "dec",
+ "gmt", "ut", "utc", "est", "edt", "cst", "cdt", "mst", "mdt",
"pst", "pdt"
+ };
+
+ public static String getDaysAgo(int n) {
+ return new
SimpleDateFormat("yyyy-MM-dd").format(System.currentTimeMillis() - n * dayms);
+ }
+
+ public static String timeStamp2Date(String millSeconds, String format) {
+ if (StringUtils.isBlank(millSeconds)) {
+ return "";
+ }
+ if (format == null || format.isEmpty()) format = "yyyy-MM-dd HH:mm:ss";
+ SimpleDateFormat sdf = new SimpleDateFormat(format);
+ return sdf.format(new Date(Long.valueOf(millSeconds)));
+ }
public static String getTime() {
DateTimeFormatter dateTimeFormatter =
DateTimeFormatter.ofPattern("yyyy.MM.dd");
@@ -53,77 +85,115 @@ public class DateUtils {
return tomorrow.format(formatter);
}
- /**
- * Get the first millisecond today
- *
- * @return
- */
- public static long getTodayFirstMillisecond() {
- Date date = new Date();
- Calendar calendar = Calendar.getInstance();
- calendar.setTime(date);
- date = calendar.getTime();
- SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd");
+ public static Long getThisDayFirstMillisecond(String thisDay) {
+ SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd");
+ Date date = null;
try {
- date = sdf.parse(sdf.format(date));
+ date = format.parse(thisDay);
} catch (ParseException e) {
e.printStackTrace();
}
return date.getTime();
}
- /**
- * Get yesterday's first millisecond
- *
- * @return]
- */
- public static long getYesterdayFirstMillisecond() {
- return getTodayFirstMillisecond() - dayms;
- }
+ private static String normalize(CharSequence dateStr) {
+ if (StrUtil.isBlank(dateStr)) {
+ return StrUtil.str(dateStr);
+ }
- /**
- * Get the first millisecond of the day before yesterday
- *
- * @return]
- */
- public static long getBeforeYesterdayFirstMillisecond() {
- return getTodayFirstMillisecond() - dayms * 2;
- }
+ final List<String> dateAndTime = StrUtil.splitTrim(dateStr, ' ');
+ final int size = dateAndTime.size();
+ if (size < 1 || size > 2) {
+ return StrUtil.str(dateStr);
+ }
- /**
- * Gets the first millisecond of the day
- *
- * @param thisDay
- * @return
- */
- public static Long getThisDayFirstMillisecond(String thisDay) {
- SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd");
- Date date = null;
- try {
- date = format.parse(thisDay);
- } catch (ParseException e) {
- e.printStackTrace();
+ final StringBuilder builder = StrUtil.builder();
+
+ String datePart = dateAndTime.get(0).replaceAll("[/.年月]", "-");
+ datePart = StrUtil.removeSuffix(datePart, "日");
+ builder.append(datePart);
+
+ if (size == 2) {
+ builder.append(' ');
+ String timePart = dateAndTime.get(1).replaceAll("[时分秒]", ":");
+ timePart = StrUtil.removeSuffix(timePart, ":");
+ timePart = timePart.replace(',', '.');
+ builder.append(timePart);
}
- return date.getTime();
+
+ return builder.toString();
}
- /**
- * Get n days ago date (yyyy-MM-dd)
- *
- * @param n
- * @return
- */
- public static String getDaysAgo(int n) {
- return new
SimpleDateFormat("yyyy-MM-dd").format(System.currentTimeMillis() - n * dayms);
+ public static DateTime parse(CharSequence dateStr, DateParser parser) {
+ return new DateTime(dateStr, parser);
}
- public static String timeStamp2Date(String millSeconds, String format) {
- if (StringUtils.isBlank(millSeconds)) {
- return "";
+ public static DateTime parse(CharSequence dateCharSequence) {
+ if (StrUtil.isBlank(dateCharSequence)) {
+ return null;
}
- if (format == null || format.isEmpty()) format = "yyyy-MM-dd HH:mm:ss";
- SimpleDateFormat sdf = new SimpleDateFormat(format);
- return sdf.format(new Date(Long.valueOf(millSeconds)));
+ String dateStr = dateCharSequence.toString();
+ dateStr = StrUtil.removeAll(dateStr.trim(), '日', '秒');
+ int length = dateStr.length();
+ if (NumberUtil.isNumber(dateStr)) {
+ if (length == DatePattern.PURE_DATETIME_PATTERN.length()) {
+ return parse(dateStr, DatePattern.PURE_DATETIME_FORMAT);
+ } else if (length ==
DatePattern.PURE_DATETIME_MS_PATTERN.length()) {
+ return parse(dateStr, DatePattern.PURE_DATETIME_MS_FORMAT);
+ } else if (length == DatePattern.PURE_DATE_PATTERN.length()) {
+ return parse(dateStr, DatePattern.PURE_DATE_FORMAT);
+ } else if (length == DatePattern.PURE_TIME_PATTERN.length()) {
+ return parse(dateStr, DatePattern.PURE_TIME_FORMAT);
+ }
+ } else if (ReUtil.isMatch(PatternPool.TIME, dateStr)) {
+ return parseTimeToday(dateStr);
+ } else if (StrUtil.containsAnyIgnoreCase(dateStr, wtb)) {
+ return parseCST(dateStr);
+ } else if (StrUtil.contains(dateStr, 'T')) {
+ // UTC时间
+ return parseUTC(dateStr);
+ }
+
+ dateStr = normalize(dateStr);
+// if (ReUtil.isMatch(DatePattern.REGEX_NORM, dateStr)) {
+ if (parseDateTime(dateStr).isPresent()) {
+ final int colonCount = StrUtil.count(dateStr, CharUtil.COLON);
+ switch (colonCount) {
+ case 0:
+ return parse(dateStr, DatePattern.NORM_DATE_FORMAT);
+ case 1:
+ return parse(dateStr,
DatePattern.NORM_DATETIME_MINUTE_FORMAT);
+ case 2:
+ final int indexOfDot = StrUtil.indexOf(dateStr,
CharUtil.DOT);
+ if (indexOfDot > 0) {
+ final int length1 = dateStr.length();
+ if (length1 - indexOfDot > 4) {
+ dateStr = StrUtil.subPre(dateStr, indexOfDot + 4);
+ }
+ return parse(dateStr,
DatePattern.NORM_DATETIME_MS_FORMAT);
+ }
+ return parse(dateStr, DatePattern.NORM_DATETIME_FORMAT);
+ }
+ }
+ throw new DateException("No format fit for date String [{}] !",
dateStr);
}
+ private static final List<DateTimeFormatter> FORMATTERS = List.of(
+ DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSSSSS"),
+ DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS"),
+ DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SS"),
+ DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"),
+ DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm"),
+ DateTimeFormatter.ofPattern("yyyy-MM-dd")
+ );
+
+ public static Optional<LocalDateTime> parseDateTime(String dateStr) {
+ for (DateTimeFormatter formatter : FORMATTERS) {
+ try {
+ return Optional.of(LocalDateTime.parse(dateStr, formatter));
+ } catch (DateTimeParseException ignored) {
+ }
+ }
+ return Optional.empty();
+ }
}
diff --git
a/ozhera-log/log-common/src/test/java/org/apache/ozhera/log/common/LogParserTest.java
b/ozhera-log/log-common/src/test/java/org/apache/ozhera/log/common/LogParserTest.java
index c6ab6ba4..e2867cd3 100644
---
a/ozhera-log/log-common/src/test/java/org/apache/ozhera/log/common/LogParserTest.java
+++
b/ozhera-log/log-common/src/test/java/org/apache/ozhera/log/common/LogParserTest.java
@@ -18,6 +18,7 @@
*/
package org.apache.ozhera.log.common;
+import cn.hutool.core.date.DateUtil;
import com.google.common.base.Stopwatch;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.time.DateParser;
@@ -47,7 +48,7 @@ public class LogParserTest {
@Test
public void test1() {
Stopwatch stopwatch = Stopwatch.createStarted();
- String keyList =
"timestamp:date,podName:keyword,level:keyword,threadName:text,className:text,line:keyword,methodName:keyword,traceId:keyword,message:text,ip:ip,logstore:keyword,logsource:keyword,mqtopic:keyword,mqtag:keyword,logip:keyword,tail:keyword,linenumber:long";
+ String keyList =
"timestamp:date,level:keyword,traceId:keyword,threadName:text,className:text,line:keyword,methodName:keyword,message:keyword,logstore:keyword,logsource:keyword,mqtopic:keyword,mqtag:keyword,logip:keyword,tail:keyword,linenumber:long";
String valueList =
"0,-1,16,-1,-1,-1,-1,-1,-1,1,2,3,4,5,6,7,8,9,10,11,13,12,17,14,15";
String parseScript = "|";
String logData = "";
@@ -66,14 +67,14 @@ public class LogParserTest {
@Test
public void test2() {
Stopwatch stopwatch = Stopwatch.createStarted();
- String keyList =
"timestamp:date,mqtopic:keyword,mqtag:keyword,logstore:keyword,logsource:keyword,message:text,tail:keyword,logip:keyword,linenumber:long,filename:keyword,time:keyword,log_level:keyword,thread_name:keyword,log_name:keyword,trace_id:keyword,user_login_name:keyword,marker:keyword,tailId:integer,spaceId:integer,storeId:integer,deploySpace:keyword";
- String keyOrderList =
"timestamp:1,mqtopic:3,mqtag:3,logstore:3,logsource:3,message:1,tail:3,logip:3,linenumber:3,filename:3,time:1,log_level:1,thread_name:1,log_name:1,trace_id:1,user_login_name:1,marker:1,tailId:3,spaceId:3,storeId:3,deploySpace:3";
- String valueList = "-1,7,0,1,2,3,4,5,6";
- String parseScript =
"(?s)(?s)(?s)(?s)(\\d{4}-\\d{2}-\\d{2}\\s\\d{2}:\\d{2}:\\d{2}\\.\\d{3}\\s\\+\\d{4})\\s\\[(.*?)\\]\\s\\[(.*?)\\]\\s\\[(.*?)\\]\\s\\[(.*?)\\]\\s\\[(.*?)\\]\\s\\[(.*?)\\]\\s([\\s\\S]*)";
- String logData = "";
+ String keyList =
"timestamp:date,level:keyword,traceId:keyword,threadName:text,className:text,line:keyword,methodName:keyword,message:keyword,logstore:keyword,logsource:keyword,mqtopic:keyword,mqtag:keyword,logip:keyword,tail:keyword,linenumber:long";
+ String keyOrderList =
"timestamp:1,level:1,traceId:1,threadName:1,className:1,line:1,methodName:1,message:1,logstore:3,logsource:3,mqtopic:3,mqtag:3,logip:3,tail:3,linenumber:3";
+ String valueList = "0,1,2,3,4,5,6,7";
+ String parseScript = "|";
+ String logData =
"{\"lineNumber\":142713,\"fileName\":\"/home/work/log/nr-trade-pay-992063-585c564f9b-2b8jq/pay/server.log\",\"pointer\":46974955,\"msgBody\":\"2025-03-04
15:32:12,412|DEBUG|ee839f4c4c5a9a8788fa54929f6ef20f|DubboServerHandler-10.159.32.249:20880-thread-797|c.x.n.p.i.r.d.m.L.selectLoanCreditList|143|==>
Parameters:
5255102265996170(String)\",\"extMap\":{\"ct\":\"1741073532486\",\"ip\":\"10.159.32.249\",\"tag\":\"tags_60006_48_96833\",\"type\":\"1\"}}";
String ip = "127.0.0.1";
Long currentStamp = Instant.now().toEpochMilli();
- Integer parserType =
LogParserFactory.LogParserEnum.REGEX_PARSE.getCode();
+ Integer parserType =
LogParserFactory.LogParserEnum.SEPARATOR_PARSE.getCode();
LogParser customParse = LogParserFactory.getLogParser(parserType,
keyList, valueList, parseScript, topicName, tailName, tag, logStoreName,
keyOrderList);
Map<String, Object> parse = customParse.parse(logData, ip, 1l,
currentStamp, "");
System.out.println(parse);
@@ -97,23 +98,31 @@ public class LogParserTest {
Map<String, Object> parse = customParse.parse(logData, ip, 1l,
currentStamp, "");
System.out.println(parse);
- System.out.println(customParse.getTimestampFromString("2023-08-25
10:46:09.239", currentStamp));
+ System.out.println(customParse.getTimestampFromString("2025-02-26
19:25:00,35", currentStamp));
stopwatch.stop();
log.info("cost time:{}", stopwatch.elapsed().toMillis());
}
@Test
public void parseSimpleTest() {
+ Stopwatch stopwatch = Stopwatch.createStarted();
Integer parserType =
LogParserFactory.LogParserEnum.SEPARATOR_PARSE.getCode();
- String keyList =
"timestamp:date,mqtopic:keyword,mqtag:keyword,logstore:keyword,logsource:keyword,message:text,tail:keyword,logip:keyword,linenumber:long,filename:keyword,datetime:date,project_name:keyword,client_ip:keyword,level:keyword,log_id:keyword,url:keyword,up_ip:keyword,logger_line:keyword,thread:keyword,biz_id:keyword,tailId:integer,spaceId:integer,storeId:integer,deploySpace:keyword";
- String leyOrderList =
"timestamp:1,mqtopic:3,mqtag:3,logstore:3,logsource:3,message:1,tail:3,logip:3,linenumber:3,filename:3,datetime:1,project_name:1,client_ip:1,level:1,log_id:1,url:1,up_ip:1,logger_line:1,thread:1,biz_id:1,tailId:3,spaceId:3,storeId:3,deploySpace:3";
- String valueList = "-1,10,0,1,2,3,4,5,6,7,8,9";
- String parseScript = "]|[";
- String message = "2025-02-12T20:11:02.501+0800]";
+ String keyList =
"timestamp:date,level:keyword,threadName:text,traceId:keyword,className:text,line:keyword,message:keyword,methodName:keyword,logstore:keyword,logsource:keyword,mqtopic:keyword,mqtag:keyword,logip:keyword,tail:keyword,linenumber:long";
+ String leyOrderList =
"timestamp:1,level:1,threadName:1,traceId:1,className:1,line:1,message:1,methodName:1,logstore:3,logsource:3,mqtopic:3,mqtag:3,logip:3,tail:3,linenumber:3";
+ String valueList = "0,1,2,3,4,5,6,-1";
+ String parseScript = "|";
+ String message = "2025-03-03 10:03:34,128|INFO
|DubboServerHandler-10.157.62.39:20880-thread-203|afb5a702e39130c59f89198d83026ace|c.x.n.p.a.pool.CarActivitySearcher|?|car
activity search result:[]";
Long collectStamp = Instant.now().toEpochMilli();
LogParser logParser = LogParserFactory.getLogParser(parserType,
keyList, valueList, parseScript, leyOrderList);
- Map<String, Object> parseMsg = logParser.parseSimple(message,
collectStamp);
- Assert.assertNotNull(parseMsg);
+ for (int i = 0; i < 10000; i++) {
+ Map<String, Object> parseMsg = logParser.parse(message, "", 1l,
collectStamp, "");
+
+// System.out.println(logParser.getTimestampFromString("2025-02-26
19:25:00,35", System.currentTimeMillis()));
+ System.out.println(DateUtil.parse("2025-02-26
19:25:00,35").getTime());
+ Assert.assertNotNull(parseMsg);
+ }
+ stopwatch.stop();
+ log.info("cost time:{}", stopwatch.elapsed().toMillis());
}
@Test
diff --git
a/ozhera-log/log-manager/src/main/java/org/apache/ozhera/log/manager/service/impl/EsDataServiceImpl.java
b/ozhera-log/log-manager/src/main/java/org/apache/ozhera/log/manager/service/impl/EsDataServiceImpl.java
index 601c8e4c..8c5a07cf 100644
---
a/ozhera-log/log-manager/src/main/java/org/apache/ozhera/log/manager/service/impl/EsDataServiceImpl.java
+++
b/ozhera-log/log-manager/src/main/java/org/apache/ozhera/log/manager/service/impl/EsDataServiceImpl.java
@@ -22,8 +22,15 @@ import cn.hutool.core.date.DateUtil;
import cn.hutool.core.lang.Pair;
import com.alibaba.fastjson.JSON;
import com.google.common.collect.Lists;
-import org.apache.ozhera.app.api.model.HeraAppEnvData;
import com.xiaomi.mone.es.EsClient;
+import com.xiaomi.youpin.docean.Ioc;
+import com.xiaomi.youpin.docean.anno.Service;
+import com.xiaomi.youpin.docean.common.StringUtils;
+import com.xiaomi.youpin.docean.plugin.config.anno.Value;
+import com.xiaomi.youpin.docean.plugin.dubbo.anno.Reference;
+import com.xiaomi.youpin.docean.plugin.es.EsService;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.ozhera.app.api.model.HeraAppEnvData;
import org.apache.ozhera.log.api.enums.LogStorageTypeEnum;
import org.apache.ozhera.log.api.model.dto.TraceLogDTO;
import org.apache.ozhera.log.api.model.vo.TraceLogQuery;
@@ -58,20 +65,15 @@ import
org.apache.ozhera.log.manager.service.HeraAppEnvService;
import
org.apache.ozhera.log.manager.service.extension.common.CommonExtensionService;
import
org.apache.ozhera.log.manager.service.extension.common.CommonExtensionServiceFactory;
import org.apache.ozhera.log.parse.LogParser;
-import com.xiaomi.youpin.docean.Ioc;
-import com.xiaomi.youpin.docean.anno.Service;
-import com.xiaomi.youpin.docean.common.StringUtils;
-import com.xiaomi.youpin.docean.plugin.config.anno.Value;
-import com.xiaomi.youpin.docean.plugin.dubbo.anno.Reference;
-import com.xiaomi.youpin.docean.plugin.es.EsService;
-import lombok.extern.slf4j.Slf4j;
import org.apache.poi.hssf.usermodel.HSSFWorkbook;
import org.elasticsearch.ElasticsearchStatusException;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.index.query.BoolQueryBuilder;
+import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
+import org.elasticsearch.index.query.RangeQueryBuilder;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.search.fetch.subphase.highlight.HighlightBuilder;
@@ -188,7 +190,7 @@ public class EsDataServiceImpl implements EsDataService,
LogDataService, EsDataB
if (LogStorageTypeEnum.DORIS == storageTypeEnum) {
return dorisDataQuery(logQuery, milogLogstoreDO, dto);
} else {
- return elasticDataQuery(milogLogstoreDO, logQuery, dto,
keyList, stopWatch);
+ return elasticDataQuery(milogLogstoreDO, logQuery, dto,
keyList, stopWatch, operator);
}
} catch (Throwable e) {
log.error("Log query error, log search
error,logQuery:[{}],user:[{}]", logQuery, MoneUserContext.getCurrentUser(), e);
@@ -215,7 +217,7 @@ public class EsDataServiceImpl implements EsDataService,
LogDataService, EsDataB
}
private Result<LogDTO> elasticDataQuery(MilogLogStoreDO milogLogstoreDO,
LogQuery logQuery,
- LogDTO dto, List<String> keyList,
StopWatch stopWatch) throws IOException {
+ LogDTO dto, List<String> keyList,
StopWatch stopWatch, String operator) throws IOException {
EsService esService =
esCluster.getEsService(milogLogstoreDO.getEsClusterId());
String esIndexName =
commonExtensionService.getSearchIndex(logQuery.getStoreId(),
milogLogstoreDO.getEsIndex());
@@ -228,6 +230,12 @@ public class EsDataServiceImpl implements EsDataService,
LogDataService, EsDataB
SearchSourceBuilder builder = assembleSearchSourceBuilder(logQuery,
keyList, boolQueryBuilder);
SearchRequest searchRequest = new SearchRequest(new
String[]{esIndexName}, builder);
+
+ boolean isTimestampMissing = isTimestampMissingInQuery(searchRequest);
+ if (isTimestampMissing) {
+ log.warn("searchRequest is missing timestamp field, add timestamp
field,logQuery:{}, operator:{}", GSON.toJson(logQuery), operator);
+ }
+
// query
stopWatch.start("search-query");
SearchResponse searchResponse = esService.search(searchRequest);
@@ -235,7 +243,7 @@ public class EsDataServiceImpl implements EsDataService,
LogDataService, EsDataB
dto.setSourceBuilder(builder);
if (stopWatch.getLastTaskTimeMillis() > 7 * 1000) {
- log.warn("##LONG-COST-QUERY##{} cost:{} ms, msg:{}",
stopWatch.getLastTaskName(), stopWatch.getLastTaskTimeMillis());
+ log.warn("##LONG-COST-QUERY##{} cost:{} ms, msg:{}",
stopWatch.getLastTaskName(), stopWatch.getLastTaskTimeMillis(),
GSON.toJson(logQuery));
}
//Result transformation
stopWatch.start("data-assemble");
@@ -243,11 +251,82 @@ public class EsDataServiceImpl implements EsDataService,
LogDataService, EsDataB
stopWatch.stop();
if (stopWatch.getTotalTimeMillis() > 15 * 1000) {
- log.warn("##LONG-COST-QUERY##{} cost:{} ms, msg:{}", "gt15s",
stopWatch.getTotalTimeMillis());
+ log.warn("##LONG-COST-QUERY##{} cost:{} ms, msg:{}", "gt15s",
stopWatch.getTotalTimeMillis(), GSON.toJson(logQuery));
}
return Result.success(dto);
}
+ public static boolean isTimestampMissingInQuery(SearchRequest
searchRequest) {
+ if (searchRequest == null) {
+ return true;
+ }
+
+ SearchSourceBuilder sourceBuilder = searchRequest.source();
+ if (sourceBuilder == null || sourceBuilder.query() == null) {
+ return true;
+ }
+
+ // Check whether the query condition contains the timestamp field
+ if (sourceBuilder.query() instanceof BoolQueryBuilder) {
+ BoolQueryBuilder boolQuery = (BoolQueryBuilder)
sourceBuilder.query();
+ return !containsTimestampField(boolQuery);
+ }
+
+ return true;
+ }
+
+ /**
+ * Check if the timestamp field is included in the BoolQueryBuilder
+ *
+ * @param boolQuery BoolQueryBuilder to be checked
+ * @return Return true if the timestamp field is included; otherwise
return false
+ */
+ public static boolean containsTimestampField(BoolQueryBuilder boolQuery) {
+ // check must conditions
+ for (QueryBuilder query : boolQuery.must()) {
+ if (isTimestampFieldInQuery(query)) {
+ return true;
+ }
+ }
+
+ // check filter conditions
+ for (QueryBuilder query : boolQuery.filter()) {
+ if (isTimestampFieldInQuery(query)) {
+ return true;
+ }
+ }
+
+ // check the should conditions
+ for (QueryBuilder query : boolQuery.should()) {
+ if (isTimestampFieldInQuery(query)) {
+ return true;
+ }
+ }
+
+ // check the must_not condition
+ for (QueryBuilder query : boolQuery.mustNot()) {
+ if (isTimestampFieldInQuery(query)) {
+ return true;
+ }
+ }
+
+ return false;
+ }
+
+ /**
+ * Check whether a single QueryBuilder contains timestamp fields
+ *
+ * @param query QueryBuilder to be checked
+ * @return Return true if the timestamp field is included; otherwise
return false
+ */
+ private static boolean isTimestampFieldInQuery(QueryBuilder query) {
+ if (query instanceof RangeQueryBuilder) {
+ RangeQueryBuilder rangeQuery = (RangeQueryBuilder) query;
+ return "timestamp".equals(rangeQuery.fieldName());
+ }
+ return false;
+ }
+
private void dorisDataToLog(List<Map<String, Object>> tableColumnDTOS,
LogDTO logDTO) {
List<LogDataDTO> logDataList = Lists.newArrayList();
for (Map<String, Object> columnMap : tableColumnDTOS) {
@@ -471,6 +550,10 @@ public class EsDataServiceImpl implements EsDataService,
LogDataService, EsDataB
if (!StringUtils.isEmpty(interval)) {
BoolQueryBuilder queryBuilder =
searchLog.getQueryBuilder(logQuery, getKeyColonPrefix(logStore.getKeyList()));
String histogramField =
commonExtensionService.queryDateHistogramField(logQuery.getStoreId());
+ boolean isTimestampMissing = containsTimestampField(queryBuilder);
+ if (!isTimestampMissing) {
+ log.warn("searchRequest is missing timestamp field, add
timestamp field,logQuery:{}", GSON.toJson(logQuery));
+ }
EsClient.EsRet esRet = esService.dateHistogram(esIndex,
histogramField, interval, logQuery.getStartTime(), logQuery.getEndTime(),
queryBuilder);
result.setCounts(esRet.getCounts());
result.setTimestamps(esRet.getTimestamps());
diff --git
a/ozhera-log/log-stream/src/main/java/org/apache/ozhera/log/stream/config/MilogConfigListener.java
b/ozhera-log/log-stream/src/main/java/org/apache/ozhera/log/stream/config/MilogConfigListener.java
index f948bcbe..6b657be1 100644
---
a/ozhera-log/log-stream/src/main/java/org/apache/ozhera/log/stream/config/MilogConfigListener.java
+++
b/ozhera-log/log-stream/src/main/java/org/apache/ozhera/log/stream/config/MilogConfigListener.java
@@ -155,12 +155,16 @@ public class MilogConfigListener {
}
private void stopOldJobsForRemovedTailIds(SinkConfig sinkConfig) {
- List<Long> newIds =
sinkConfig.getLogtailConfigs().stream().map(LogtailConfig::getLogtailId).collect(Collectors.toList());
+ List<Long> newIds = Lists.newArrayList();
+ if (CollectionUtils.isNotEmpty(sinkConfig.getLogtailConfigs())) {
+ newIds =
sinkConfig.getLogtailConfigs().stream().map(LogtailConfig::getLogtailId).collect(Collectors.toList());
+ }
List<Long> oldIds = Lists.newArrayList();
if (oldSinkConfigMap.containsKey(sinkConfig.getLogstoreId())) {
oldIds =
oldSinkConfigMap.get(sinkConfig.getLogstoreId()).getLogtailConfigs().stream().map(LogtailConfig::getLogtailId).collect(Collectors.toList());
}
- List<Long> collect = oldIds.stream().filter(tailId ->
!newIds.contains(tailId)).collect(Collectors.toList());
+ List<Long> finalNewIds = newIds;
+ List<Long> collect = oldIds.stream().filter(tailId ->
!finalNewIds.contains(tailId)).collect(Collectors.toList());
if (CollectionUtils.isNotEmpty(collect)) {
log.info("newIds:{},oldIds:{},collect:{}", gson.toJson(newIds),
gson.toJson(oldIds), gson.toJson(collect));
for (Long tailId : collect) {
@@ -211,6 +215,9 @@ public class MilogConfigListener {
if (null != sinkConfig) {
log.info("[listen tail] The task to stop:{}",
gson.toJson(sinkConfig.getLogtailConfigs()));
List<LogtailConfig> logTailConfigs =
sinkConfig.getLogtailConfigs();
+ if (CollectionUtils.isEmpty(logTailConfigs)) {
+ return;
+ }
for (LogtailConfig logTailConfig : logTailConfigs) {
stopOldJobForTail(logTailConfig, sinkConfig);
}
diff --git
a/ozhera-log/log-stream/src/main/java/org/apache/ozhera/log/stream/job/JobManager.java
b/ozhera-log/log-stream/src/main/java/org/apache/ozhera/log/stream/job/JobManager.java
index 3270852e..41665703 100644
---
a/ozhera-log/log-stream/src/main/java/org/apache/ozhera/log/stream/job/JobManager.java
+++
b/ozhera-log/log-stream/src/main/java/org/apache/ozhera/log/stream/job/JobManager.java
@@ -96,9 +96,11 @@ public class JobManager {
public void stopJob(LogtailConfig logtailConfig) {
try {
- List<Long> jobKeys =
jobs.entrySet().stream().map(Map.Entry::getKey).collect(Collectors.toList());
- if(CollectionUtils.isNotEmpty(jobKeys)){
- log.info("【stop job】,all jobs:{}", jobKeys);
+ List<Long> jobKeys = jobs.keySet().stream()
+ .filter(key -> key.equals(logtailConfig.getLogtailId()))
+ .collect(Collectors.toList());
+ if (CollectionUtils.isNotEmpty(jobKeys)) {
+ log.info("stop job,the jobs:{}", jobKeys);
sinkJobsShutDown(logtailConfig);
}
} catch (Exception e) {
diff --git
a/ozhera-log/log-stream/src/main/java/org/apache/ozhera/log/stream/job/LogDataTransfer.java
b/ozhera-log/log-stream/src/main/java/org/apache/ozhera/log/stream/job/LogDataTransfer.java
index d63f3896..c312e965 100644
---
a/ozhera-log/log-stream/src/main/java/org/apache/ozhera/log/stream/job/LogDataTransfer.java
+++
b/ozhera-log/log-stream/src/main/java/org/apache/ozhera/log/stream/job/LogDataTransfer.java
@@ -21,6 +21,11 @@ package org.apache.ozhera.log.stream.job;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.util.concurrent.RateLimiter;
+import com.xiaomi.youpin.docean.Ioc;
+import lombok.Getter;
+import lombok.Setter;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang3.StringUtils;
import org.apache.ozhera.log.api.model.msg.LineMessage;
import org.apache.ozhera.log.common.Config;
import org.apache.ozhera.log.parse.LogParser;
@@ -31,10 +36,6 @@ import
org.apache.ozhera.log.stream.job.extension.MessageLifecycleManager;
import org.apache.ozhera.log.stream.job.extension.MessageSender;
import org.apache.ozhera.log.stream.job.extension.MqMessagePostProcessing;
import org.apache.ozhera.log.stream.sink.SinkChain;
-import com.xiaomi.youpin.docean.Ioc;
-import lombok.Getter;
-import lombok.Setter;
-import lombok.extern.slf4j.Slf4j;
import java.time.Instant;
import java.util.Map;
@@ -143,7 +144,9 @@ public class LogDataTransfer {
dataMap.putIfAbsent(LOG_STREAM_SPACE_ID,
sinkJobConfig.getLogSpaceId());
dataMap.putIfAbsent(LOG_STREAM_STORE_ID,
sinkJobConfig.getLogStoreId());
dataMap.putIfAbsent(LOG_STREAM_TAIL_ID, sinkJobConfig.getLogTailId());
- dataMap.putIfAbsent(DEPLOY_SPACE, sinkJobConfig.getDeploySpace());
+ if (StringUtils.isNotBlank(sinkJobConfig.getDeploySpace())) {
+ dataMap.putIfAbsent(DEPLOY_SPACE, sinkJobConfig.getDeploySpace());
+ }
}
private void sendMessage(Map<String, Object> dataMap) throws Exception {
diff --git
a/ozhera-log/log-stream/src/main/java/org/apache/ozhera/log/stream/plugin/es/EsPlugin.java
b/ozhera-log/log-stream/src/main/java/org/apache/ozhera/log/stream/plugin/es/EsPlugin.java
index 08e3d0c8..7c878beb 100644
---
a/ozhera-log/log-stream/src/main/java/org/apache/ozhera/log/stream/plugin/es/EsPlugin.java
+++
b/ozhera-log/log-stream/src/main/java/org/apache/ozhera/log/stream/plugin/es/EsPlugin.java
@@ -63,7 +63,7 @@ public class EsPlugin {
private static int DEFAULT_PROCESSOR_COUNT = 1;
- private static ReentrantLock esLock = new ReentrantLock();
+ private static final ReentrantLock esLock = new ReentrantLock();
private static Gson gson = new Gson();
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]