This is an automated email from the ASF dual-hosted git repository.
gaoxihui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozhera.git
The following commit(s) were added to refs/heads/master by this push:
new 9f0d12fc refactor: log some bug fix and opzimize (#496)
9f0d12fc is described below
commit 9f0d12fcbf8e82c7f1b0fe06c481f26452f8fee7
Author: wtt <[email protected]>
AuthorDate: Fri Oct 25 16:53:46 2024 +0800
refactor: log some bug fix and opzimize (#496)
* refactor: fix log-stream monitor
* refactor: update set resource
---
.../agent/channel/WildcardChannelServiceImpl.java | 17 ++++++++++++-----
.../ozhera/log/agent/common/trace/TraceUtil.java | 2 +-
.../log/stream/config/MilogConfigListener.java | 20 ++++++++++++++++++--
pom.xml | 2 +-
.../trace/etl/extension/doris/WriteDorisService.java | 2 +-
.../trace/etl/extension/es/WriteEsService.java | 2 +-
.../trace/etl/consumer/MetricsParseService.java | 2 +-
.../org/apache/ozhera/trace/etl/util/TraceUtil.java | 2 +-
8 files changed, 36 insertions(+), 13 deletions(-)
diff --git
a/ozhera-log/log-agent/src/main/java/org/apache/ozhera/log/agent/channel/WildcardChannelServiceImpl.java
b/ozhera-log/log-agent/src/main/java/org/apache/ozhera/log/agent/channel/WildcardChannelServiceImpl.java
index d8c01138..c98292a1 100644
---
a/ozhera-log/log-agent/src/main/java/org/apache/ozhera/log/agent/channel/WildcardChannelServiceImpl.java
+++
b/ozhera-log/log-agent/src/main/java/org/apache/ozhera/log/agent/channel/WildcardChannelServiceImpl.java
@@ -15,6 +15,7 @@
*/
package org.apache.ozhera.log.agent.channel;
+import cn.hutool.system.SystemUtil;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.xiaomi.data.push.common.SafeRun;
@@ -25,6 +26,9 @@ import com.xiaomi.mone.file.common.FileInfo;
import com.xiaomi.mone.file.common.FileInfoCache;
import com.xiaomi.mone.file.listener.DefaultMonitorListener;
import com.xiaomi.mone.file.ozhera.HeraFileMonitor;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.commons.lang3.StringUtils;
import org.apache.ozhera.log.agent.channel.file.MonitorFile;
import org.apache.ozhera.log.agent.channel.memory.AgentMemoryService;
import org.apache.ozhera.log.agent.channel.memory.ChannelMemory;
@@ -37,9 +41,6 @@ import org.apache.ozhera.log.api.enums.LogTypeEnum;
import org.apache.ozhera.log.api.model.meta.FilterConf;
import org.apache.ozhera.log.api.model.msg.LineMessage;
import org.apache.ozhera.log.common.PathUtils;
-import lombok.extern.slf4j.Slf4j;
-import org.apache.commons.collections.CollectionUtils;
-import org.apache.commons.lang3.StringUtils;
import java.io.File;
import java.io.IOException;
@@ -102,6 +103,8 @@ public class WildcardChannelServiceImpl extends
AbstractChannelService {
private DefaultMonitorListener defaultMonitorListener;
+ private HeraFileMonitor fileMonitor;
+
public WildcardChannelServiceImpl(MsgExporter msgExporter,
AgentMemoryService memoryService,
ChannelDefine channelDefine, FilterChain
chain, String memoryBasePath) {
@@ -143,7 +146,7 @@ public class WildcardChannelServiceImpl extends
AbstractChannelService {
String restartFile = buildRestartFilePath();
FileInfoCache.ins().load(restartFile);
- HeraFileMonitor monitor =
createFileMonitor(input.getPatternCode(), ip);
+ fileMonitor = createFileMonitor(input.getPatternCode(), ip);
String fileExpression = buildFileExpression(input.getLogPattern());
@@ -157,7 +160,7 @@ public class WildcardChannelServiceImpl extends
AbstractChannelService {
// Compile the file expression pattern
Pattern pattern = Pattern.compile(fileExpression);
for (String monitorPath : monitorPaths) {
- fileCollFutures.add(ExecutorUtil.submit(() ->
monitorFileChanges(monitor, monitorPath, pattern)));
+ fileCollFutures.add(ExecutorUtil.submit(() ->
monitorFileChanges(fileMonitor, monitorPath, pattern)));
}
} catch (Exception e) {
log.error("startCollectFile error, channelId: {}, input: {}, ip:
{}", channelId, GSON.toJson(input), ip, e);
@@ -223,6 +226,9 @@ public class WildcardChannelServiceImpl extends
AbstractChannelService {
try {
log.info("monitorFileChanges,directory:{}", monitorPath);
monitor.reg(monitorPath, filePath -> {
+ if (SystemUtil.getOsInfo().isWindows()) {
+ return true;
+ }
boolean matches = pattern.matcher(filePath).matches();
log.debug("file: {}, matches: {}", filePath, matches);
return matches;
@@ -441,6 +447,7 @@ public class WildcardChannelServiceImpl extends
AbstractChannelService {
@Override
public void close() {
+ fileMonitor.stop();
log.info("Delete the current collection task,channelId:{}",
channelDefine.getChannelId());
//2. stop exporting
this.msgExporter.close();
diff --git
a/ozhera-log/log-agent/src/main/java/org/apache/ozhera/log/agent/common/trace/TraceUtil.java
b/ozhera-log/log-agent/src/main/java/org/apache/ozhera/log/agent/common/trace/TraceUtil.java
index d494d948..5baf1bfb 100644
---
a/ozhera-log/log-agent/src/main/java/org/apache/ozhera/log/agent/common/trace/TraceUtil.java
+++
b/ozhera-log/log-agent/src/main/java/org/apache/ozhera/log/agent/common/trace/TraceUtil.java
@@ -123,7 +123,7 @@ public class TraceUtil {
span.setKind(toTKind(spanKind));
span.setEvents(toTEventList(JSONArray.parseArray(decodeLineBreak(array[MessageUtil.EVENTS]))));
span.setTotalRecordedEvents(span.getEventsSize());
- span.setResource(
+ span.setResouce(
toTResource(JSONObject.parseObject(array[MessageUtil.REOUSCES]),
specialAttrMap));
span.setExtra(toTExtra(specialAttrMap));
// using links["ref_type=CHILD_OF"] as parent span context and using
left as links
diff --git
a/ozhera-log/log-stream/src/main/java/org/apache/ozhera/log/stream/config/MilogConfigListener.java
b/ozhera-log/log-stream/src/main/java/org/apache/ozhera/log/stream/config/MilogConfigListener.java
index 7190b74b..345fcda3 100644
---
a/ozhera-log/log-stream/src/main/java/org/apache/ozhera/log/stream/config/MilogConfigListener.java
+++
b/ozhera-log/log-stream/src/main/java/org/apache/ozhera/log/stream/config/MilogConfigListener.java
@@ -239,6 +239,7 @@ public class MilogConfigListener {
* @param newMilogSpaceData
*/
private void initNewJob(MilogSpaceData newMilogSpaceData) {
+ stopOldJobsIfNeeded();
log.info("Start all tasks to restart the current spaceļ¼spaceData:{}",
gson.toJson(newMilogSpaceData));
Map<Long, LogtailConfig> newLogTailConfigMap = new HashMap<>();
Map<Long, SinkConfig> newSinkConfigMap = new HashMap<>();
@@ -262,9 +263,24 @@ public class MilogConfigListener {
oldSinkConfigMap = newSinkConfigMap;
}
+ private void stopOldJobsIfNeeded() {
+ if (!oldLogTailConfigMap.isEmpty()) {
+ for (LogtailConfig value : oldLogTailConfigMap.values()) {
+ jobManager.stopJob(value);
+ }
+ oldLogTailConfigMap.clear();
+ }
+ if (!oldSinkConfigMap.isEmpty()) {
+ for (SinkConfig value : oldSinkConfigMap.values()) {
+ stopOldJobsForStore(value.getLogstoreId());
+ }
+ oldSinkConfigMap.clear();
+ }
+ }
+
private void startTailPer(SinkConfig sinkConfig, LogtailConfig
logTailConfig, Long logSpaceId) {
- if (null == logSpaceId) {
- log.warn("startTailPer error,logSpaceId is null,LogTailConfig:{}",
gson.toJson(logTailConfig));
+ if (null == logSpaceId || null == logTailConfig || null ==
logTailConfig.getLogtailId()) {
+ log.error("logSpaceId or logTailConfig or logTailId is
null,sinkConfig:{},logTailConfig:{},logSpaceId:{}", gson.toJson(sinkConfig),
gson.toJson(logTailConfig), spaceId);
return;
}
Boolean isStart =
streamCommonExtension.preCheckTaskExecution(sinkConfig, logTailConfig,
logSpaceId);
diff --git a/pom.xml b/pom.xml
index b96a4712..f3205e6c 100644
--- a/pom.xml
+++ b/pom.xml
@@ -634,7 +634,7 @@
<dependency>
<groupId>run.mone</groupId>
<artifactId>file</artifactId>
- <version>1.5.1-jdk21</version>
+ <version>1.6.0-jdk21</version>
</dependency>
<dependency>
<groupId>run.mone</groupId>
diff --git
a/trace-etl/trace-etl-extensions/trace-etl-doris-extension/src/main/java/org/apache/ozhera/trace/etl/extension/doris/WriteDorisService.java
b/trace-etl/trace-etl-extensions/trace-etl-doris-extension/src/main/java/org/apache/ozhera/trace/etl/extension/doris/WriteDorisService.java
index ec38d5e7..32d954e8 100644
---
a/trace-etl/trace-etl-extensions/trace-etl-doris-extension/src/main/java/org/apache/ozhera/trace/etl/extension/doris/WriteDorisService.java
+++
b/trace-etl/trace-etl-extensions/trace-etl-doris-extension/src/main/java/org/apache/ozhera/trace/etl/extension/doris/WriteDorisService.java
@@ -167,7 +167,7 @@ public class WriteDorisService {
// build logs
spanData.put(HeraTraceSpanColumn.logs,
GSON.toJson(buildLogs(tSpanData.getEvents())));
// build process
- spanData.put(HeraTraceSpanColumn.process,
GSON.toJson(buildProcess(tSpanData.getExtra().getServiceName(),
tSpanData.getResource())));
+ spanData.put(HeraTraceSpanColumn.process,
GSON.toJson(buildProcess(tSpanData.getExtra().getServiceName(),
tSpanData.getResouce())));
return spanData;
}
diff --git
a/trace-etl/trace-etl-extensions/trace-etl-es-extension/src/main/java/org/apache/ozhera/trace/etl/extension/es/WriteEsService.java
b/trace-etl/trace-etl-extensions/trace-etl-es-extension/src/main/java/org/apache/ozhera/trace/etl/extension/es/WriteEsService.java
index 7e2dc111..5deeaa68 100644
---
a/trace-etl/trace-etl-extensions/trace-etl-es-extension/src/main/java/org/apache/ozhera/trace/etl/extension/es/WriteEsService.java
+++
b/trace-etl/trace-etl-extensions/trace-etl-es-extension/src/main/java/org/apache/ozhera/trace/etl/extension/es/WriteEsService.java
@@ -127,7 +127,7 @@ public class WriteEsService {
// build logs
jaegerESDomain.setLogs(buildLogs(tSpanData.getEvents()));
// build process
-
jaegerESDomain.setProcess(buildProcess(tSpanData.getExtra().getServiceName(),
tSpanData.getResource()));
+
jaegerESDomain.setProcess(buildProcess(tSpanData.getExtra().getServiceName(),
tSpanData.getResouce()));
return JSONObject.toJSONString(jaegerESDomain,
SerializerFeature.WriteMapNullValue);
}
diff --git
a/trace-etl/trace-etl-server/src/main/java/org/apache/ozhera/trace/etl/consumer/MetricsParseService.java
b/trace-etl/trace-etl-server/src/main/java/org/apache/ozhera/trace/etl/consumer/MetricsParseService.java
index 5b224f73..c0f01ac7 100644
---
a/trace-etl/trace-etl-server/src/main/java/org/apache/ozhera/trace/etl/consumer/MetricsParseService.java
+++
b/trace-etl/trace-etl-server/src/main/java/org/apache/ozhera/trace/etl/consumer/MetricsParseService.java
@@ -278,7 +278,7 @@ public class MetricsParseService implements
IMetricsParseService {
}
}
// Gets the properties in process
- TResource resource = tSpanData.getResource();
+ TResource resource = tSpanData.getResouce();
if (resource != null) {
TAttributes resourceAttributes = resource.getAttributes();
List<TAttributeKey> resourceKeys = resourceAttributes.getKeys();
diff --git
a/trace-etl/trace-etl-service/src/main/java/org/apache/ozhera/trace/etl/util/TraceUtil.java
b/trace-etl/trace-etl-service/src/main/java/org/apache/ozhera/trace/etl/util/TraceUtil.java
index fd752d63..2f941545 100644
---
a/trace-etl/trace-etl-service/src/main/java/org/apache/ozhera/trace/etl/util/TraceUtil.java
+++
b/trace-etl/trace-etl-service/src/main/java/org/apache/ozhera/trace/etl/util/TraceUtil.java
@@ -106,7 +106,7 @@ public class TraceUtil {
span.setKind(toTKind(spanKind));
span.setEvents(toTEventList(JSONArray.parseArray(decodeLineBreak(array[MessageUtil.EVENTS]))));
span.setTotalRecordedEvents(span.getEventsSize());
- span.setResource(
+ span.setResouce(
toTResource(JSONObject.parseObject(array[MessageUtil.REOUSCES]),
specialAttrMap));
span.setExtra(toTExtra(specialAttrMap));
// using links["ref_type=CHILD_OF"] as parent span context and using
left as links
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]