This is an automated email from the ASF dual-hosted git repository.

gaoxihui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozhera.git


The following commit(s) were added to refs/heads/master by this push:
     new 9f0d12fc refactor: log some bug fix and opzimize (#496)
9f0d12fc is described below

commit 9f0d12fcbf8e82c7f1b0fe06c481f26452f8fee7
Author: wtt <[email protected]>
AuthorDate: Fri Oct 25 16:53:46 2024 +0800

    refactor: log some bug fix and opzimize (#496)
    
    * refactor: fix log-stream monitor
    
    * refactor: update set resource
---
 .../agent/channel/WildcardChannelServiceImpl.java    | 17 ++++++++++++-----
 .../ozhera/log/agent/common/trace/TraceUtil.java     |  2 +-
 .../log/stream/config/MilogConfigListener.java       | 20 ++++++++++++++++++--
 pom.xml                                              |  2 +-
 .../trace/etl/extension/doris/WriteDorisService.java |  2 +-
 .../trace/etl/extension/es/WriteEsService.java       |  2 +-
 .../trace/etl/consumer/MetricsParseService.java      |  2 +-
 .../org/apache/ozhera/trace/etl/util/TraceUtil.java  |  2 +-
 8 files changed, 36 insertions(+), 13 deletions(-)

diff --git 
a/ozhera-log/log-agent/src/main/java/org/apache/ozhera/log/agent/channel/WildcardChannelServiceImpl.java
 
b/ozhera-log/log-agent/src/main/java/org/apache/ozhera/log/agent/channel/WildcardChannelServiceImpl.java
index d8c01138..c98292a1 100644
--- 
a/ozhera-log/log-agent/src/main/java/org/apache/ozhera/log/agent/channel/WildcardChannelServiceImpl.java
+++ 
b/ozhera-log/log-agent/src/main/java/org/apache/ozhera/log/agent/channel/WildcardChannelServiceImpl.java
@@ -15,6 +15,7 @@
  */
 package org.apache.ozhera.log.agent.channel;
 
+import cn.hutool.system.SystemUtil;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import com.xiaomi.data.push.common.SafeRun;
@@ -25,6 +26,9 @@ import com.xiaomi.mone.file.common.FileInfo;
 import com.xiaomi.mone.file.common.FileInfoCache;
 import com.xiaomi.mone.file.listener.DefaultMonitorListener;
 import com.xiaomi.mone.file.ozhera.HeraFileMonitor;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.commons.lang3.StringUtils;
 import org.apache.ozhera.log.agent.channel.file.MonitorFile;
 import org.apache.ozhera.log.agent.channel.memory.AgentMemoryService;
 import org.apache.ozhera.log.agent.channel.memory.ChannelMemory;
@@ -37,9 +41,6 @@ import org.apache.ozhera.log.api.enums.LogTypeEnum;
 import org.apache.ozhera.log.api.model.meta.FilterConf;
 import org.apache.ozhera.log.api.model.msg.LineMessage;
 import org.apache.ozhera.log.common.PathUtils;
-import lombok.extern.slf4j.Slf4j;
-import org.apache.commons.collections.CollectionUtils;
-import org.apache.commons.lang3.StringUtils;
 
 import java.io.File;
 import java.io.IOException;
@@ -102,6 +103,8 @@ public class WildcardChannelServiceImpl extends 
AbstractChannelService {
 
     private DefaultMonitorListener defaultMonitorListener;
 
+    private HeraFileMonitor fileMonitor;
+
 
     public WildcardChannelServiceImpl(MsgExporter msgExporter, 
AgentMemoryService memoryService,
                                       ChannelDefine channelDefine, FilterChain 
chain, String memoryBasePath) {
@@ -143,7 +146,7 @@ public class WildcardChannelServiceImpl extends 
AbstractChannelService {
             String restartFile = buildRestartFilePath();
             FileInfoCache.ins().load(restartFile);
 
-            HeraFileMonitor monitor = 
createFileMonitor(input.getPatternCode(), ip);
+            fileMonitor = createFileMonitor(input.getPatternCode(), ip);
 
             String fileExpression = buildFileExpression(input.getLogPattern());
 
@@ -157,7 +160,7 @@ public class WildcardChannelServiceImpl extends 
AbstractChannelService {
             // Compile the file expression pattern
             Pattern pattern = Pattern.compile(fileExpression);
             for (String monitorPath : monitorPaths) {
-                fileCollFutures.add(ExecutorUtil.submit(() -> 
monitorFileChanges(monitor, monitorPath, pattern)));
+                fileCollFutures.add(ExecutorUtil.submit(() -> 
monitorFileChanges(fileMonitor, monitorPath, pattern)));
             }
         } catch (Exception e) {
             log.error("startCollectFile error, channelId: {}, input: {}, ip: 
{}", channelId, GSON.toJson(input), ip, e);
@@ -223,6 +226,9 @@ public class WildcardChannelServiceImpl extends 
AbstractChannelService {
         try {
             log.info("monitorFileChanges,directory:{}", monitorPath);
             monitor.reg(monitorPath, filePath -> {
+                if (SystemUtil.getOsInfo().isWindows()) {
+                    return true;
+                }
                 boolean matches = pattern.matcher(filePath).matches();
                 log.debug("file: {}, matches: {}", filePath, matches);
                 return matches;
@@ -441,6 +447,7 @@ public class WildcardChannelServiceImpl extends 
AbstractChannelService {
 
     @Override
     public void close() {
+        fileMonitor.stop();
         log.info("Delete the current collection task,channelId:{}", 
channelDefine.getChannelId());
         //2. stop exporting
         this.msgExporter.close();
diff --git 
a/ozhera-log/log-agent/src/main/java/org/apache/ozhera/log/agent/common/trace/TraceUtil.java
 
b/ozhera-log/log-agent/src/main/java/org/apache/ozhera/log/agent/common/trace/TraceUtil.java
index d494d948..5baf1bfb 100644
--- 
a/ozhera-log/log-agent/src/main/java/org/apache/ozhera/log/agent/common/trace/TraceUtil.java
+++ 
b/ozhera-log/log-agent/src/main/java/org/apache/ozhera/log/agent/common/trace/TraceUtil.java
@@ -123,7 +123,7 @@ public class TraceUtil {
         span.setKind(toTKind(spanKind));
         
span.setEvents(toTEventList(JSONArray.parseArray(decodeLineBreak(array[MessageUtil.EVENTS]))));
         span.setTotalRecordedEvents(span.getEventsSize());
-        span.setResource(
+        span.setResouce(
                 
toTResource(JSONObject.parseObject(array[MessageUtil.REOUSCES]), 
specialAttrMap));
         span.setExtra(toTExtra(specialAttrMap));
         // using links["ref_type=CHILD_OF"] as parent span context and using 
left as links
diff --git 
a/ozhera-log/log-stream/src/main/java/org/apache/ozhera/log/stream/config/MilogConfigListener.java
 
b/ozhera-log/log-stream/src/main/java/org/apache/ozhera/log/stream/config/MilogConfigListener.java
index 7190b74b..345fcda3 100644
--- 
a/ozhera-log/log-stream/src/main/java/org/apache/ozhera/log/stream/config/MilogConfigListener.java
+++ 
b/ozhera-log/log-stream/src/main/java/org/apache/ozhera/log/stream/config/MilogConfigListener.java
@@ -239,6 +239,7 @@ public class MilogConfigListener {
      * @param newMilogSpaceData
      */
     private void initNewJob(MilogSpaceData newMilogSpaceData) {
+        stopOldJobsIfNeeded();
         log.info("Start all tasks to restart the current space,spaceData:{}", 
gson.toJson(newMilogSpaceData));
         Map<Long, LogtailConfig> newLogTailConfigMap = new HashMap<>();
         Map<Long, SinkConfig> newSinkConfigMap = new HashMap<>();
@@ -262,9 +263,24 @@ public class MilogConfigListener {
         oldSinkConfigMap = newSinkConfigMap;
     }
 
+    private void stopOldJobsIfNeeded() {
+        if (!oldLogTailConfigMap.isEmpty()) {
+            for (LogtailConfig value : oldLogTailConfigMap.values()) {
+                jobManager.stopJob(value);
+            }
+            oldLogTailConfigMap.clear();
+        }
+        if (!oldSinkConfigMap.isEmpty()) {
+            for (SinkConfig value : oldSinkConfigMap.values()) {
+                stopOldJobsForStore(value.getLogstoreId());
+            }
+            oldSinkConfigMap.clear();
+        }
+    }
+
     private void startTailPer(SinkConfig sinkConfig, LogtailConfig 
logTailConfig, Long logSpaceId) {
-        if (null == logSpaceId) {
-            log.warn("startTailPer error,logSpaceId is null,LogTailConfig:{}", 
gson.toJson(logTailConfig));
+        if (null == logSpaceId || null == logTailConfig || null == 
logTailConfig.getLogtailId()) {
+            log.error("logSpaceId or logTailConfig or logTailId is 
null,sinkConfig:{},logTailConfig:{},logSpaceId:{}", gson.toJson(sinkConfig), 
gson.toJson(logTailConfig), spaceId);
             return;
         }
         Boolean isStart = 
streamCommonExtension.preCheckTaskExecution(sinkConfig, logTailConfig, 
logSpaceId);
diff --git a/pom.xml b/pom.xml
index b96a4712..f3205e6c 100644
--- a/pom.xml
+++ b/pom.xml
@@ -634,7 +634,7 @@
             <dependency>
                 <groupId>run.mone</groupId>
                 <artifactId>file</artifactId>
-                <version>1.5.1-jdk21</version>
+                <version>1.6.0-jdk21</version>
             </dependency>
             <dependency>
                 <groupId>run.mone</groupId>
diff --git 
a/trace-etl/trace-etl-extensions/trace-etl-doris-extension/src/main/java/org/apache/ozhera/trace/etl/extension/doris/WriteDorisService.java
 
b/trace-etl/trace-etl-extensions/trace-etl-doris-extension/src/main/java/org/apache/ozhera/trace/etl/extension/doris/WriteDorisService.java
index ec38d5e7..32d954e8 100644
--- 
a/trace-etl/trace-etl-extensions/trace-etl-doris-extension/src/main/java/org/apache/ozhera/trace/etl/extension/doris/WriteDorisService.java
+++ 
b/trace-etl/trace-etl-extensions/trace-etl-doris-extension/src/main/java/org/apache/ozhera/trace/etl/extension/doris/WriteDorisService.java
@@ -167,7 +167,7 @@ public class WriteDorisService {
         // build logs
         spanData.put(HeraTraceSpanColumn.logs, 
GSON.toJson(buildLogs(tSpanData.getEvents())));
         // build process
-        spanData.put(HeraTraceSpanColumn.process, 
GSON.toJson(buildProcess(tSpanData.getExtra().getServiceName(), 
tSpanData.getResource())));
+        spanData.put(HeraTraceSpanColumn.process, 
GSON.toJson(buildProcess(tSpanData.getExtra().getServiceName(), 
tSpanData.getResouce())));
         return spanData;
     }
 
diff --git 
a/trace-etl/trace-etl-extensions/trace-etl-es-extension/src/main/java/org/apache/ozhera/trace/etl/extension/es/WriteEsService.java
 
b/trace-etl/trace-etl-extensions/trace-etl-es-extension/src/main/java/org/apache/ozhera/trace/etl/extension/es/WriteEsService.java
index 7e2dc111..5deeaa68 100644
--- 
a/trace-etl/trace-etl-extensions/trace-etl-es-extension/src/main/java/org/apache/ozhera/trace/etl/extension/es/WriteEsService.java
+++ 
b/trace-etl/trace-etl-extensions/trace-etl-es-extension/src/main/java/org/apache/ozhera/trace/etl/extension/es/WriteEsService.java
@@ -127,7 +127,7 @@ public class WriteEsService {
         // build logs
         jaegerESDomain.setLogs(buildLogs(tSpanData.getEvents()));
         // build process
-        
jaegerESDomain.setProcess(buildProcess(tSpanData.getExtra().getServiceName(), 
tSpanData.getResource()));
+        
jaegerESDomain.setProcess(buildProcess(tSpanData.getExtra().getServiceName(), 
tSpanData.getResouce()));
         return JSONObject.toJSONString(jaegerESDomain, 
SerializerFeature.WriteMapNullValue);
     }
 
diff --git 
a/trace-etl/trace-etl-server/src/main/java/org/apache/ozhera/trace/etl/consumer/MetricsParseService.java
 
b/trace-etl/trace-etl-server/src/main/java/org/apache/ozhera/trace/etl/consumer/MetricsParseService.java
index 5b224f73..c0f01ac7 100644
--- 
a/trace-etl/trace-etl-server/src/main/java/org/apache/ozhera/trace/etl/consumer/MetricsParseService.java
+++ 
b/trace-etl/trace-etl-server/src/main/java/org/apache/ozhera/trace/etl/consumer/MetricsParseService.java
@@ -278,7 +278,7 @@ public class MetricsParseService implements 
IMetricsParseService {
             }
         }
         // Gets the properties in process
-        TResource resource = tSpanData.getResource();
+        TResource resource = tSpanData.getResouce();
         if (resource != null) {
             TAttributes resourceAttributes = resource.getAttributes();
             List<TAttributeKey> resourceKeys = resourceAttributes.getKeys();
diff --git 
a/trace-etl/trace-etl-service/src/main/java/org/apache/ozhera/trace/etl/util/TraceUtil.java
 
b/trace-etl/trace-etl-service/src/main/java/org/apache/ozhera/trace/etl/util/TraceUtil.java
index fd752d63..2f941545 100644
--- 
a/trace-etl/trace-etl-service/src/main/java/org/apache/ozhera/trace/etl/util/TraceUtil.java
+++ 
b/trace-etl/trace-etl-service/src/main/java/org/apache/ozhera/trace/etl/util/TraceUtil.java
@@ -106,7 +106,7 @@ public class TraceUtil {
         span.setKind(toTKind(spanKind));
         
span.setEvents(toTEventList(JSONArray.parseArray(decodeLineBreak(array[MessageUtil.EVENTS]))));
         span.setTotalRecordedEvents(span.getEventsSize());
-        span.setResource(
+        span.setResouce(
                 
toTResource(JSONObject.parseObject(array[MessageUtil.REOUSCES]), 
specialAttrMap));
         span.setExtra(toTExtra(specialAttrMap));
         // using links["ref_type=CHILD_OF"] as parent span context and using 
left as links


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to