This is an automated email from the ASF dual-hosted git repository.

dingtao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozhera.git


The following commit(s) were added to refs/heads/master by this push:
     new 5d757475 feat: introducing a log filter chain based on Pipeline mode 
(#616)
5d757475 is described below

commit 5d75747596b81c7361897df6a361ffe89f86dde4
Author: wtt <[email protected]>
AuthorDate: Tue Nov 25 17:00:50 2025 +0800

    feat: introducing a log filter chain based on Pipeline mode (#616)
    
    * fix: solve the blocking problem caused by serverless startup
    
    * feat: introducing a log filter chain based on Pipeline mode
    
    * refactor: fix null pointer exception in configuration acquisition logic
---
 ozhera-log/log-agent/pom.xml                       |  2 +-
 .../ozhera/log/agent/channel/ChannelEngine.java    | 30 +++++-----
 .../log/agent/channel/ChannelServiceFactory.java   | 26 ++++-----
 .../log/agent/channel/ChannelServiceImpl.java      | 17 +++++-
 .../agent/channel/WildcardChannelServiceImpl.java  | 30 ++++++----
 .../log/agent/channel/pipeline/Pipeline.java       | 65 ++++++++++++++++++++++
 .../log/agent/channel/pipeline/RequestContext.java | 41 ++++++++++++++
 .../ozhera/log/agent/channel/pipeline/Valve.java   | 40 +++++++++++++
 .../log/agent/channel/pipeline/package-info.java   | 27 +++++++++
 .../ozhera/log/agent/common/ChannelUtil.java       | 36 +++++++++---
 10 files changed, 261 insertions(+), 53 deletions(-)

diff --git a/ozhera-log/log-agent/pom.xml b/ozhera-log/log-agent/pom.xml
index 3a29698a..d8bb355c 100644
--- a/ozhera-log/log-agent/pom.xml
+++ b/ozhera-log/log-agent/pom.xml
@@ -28,7 +28,7 @@ http://www.apache.org/licenses/LICENSE-2.0
     <modelVersion>4.0.0</modelVersion>
 
     <artifactId>log-agent</artifactId>
-    <version>2.2.14-SNAPSHOT</version>
+    <version>2.2.15-SNAPSHOT</version>
 
     <properties>
         <maven.compiler.source>21</maven.compiler.source>
diff --git 
a/ozhera-log/log-agent/src/main/java/org/apache/ozhera/log/agent/channel/ChannelEngine.java
 
b/ozhera-log/log-agent/src/main/java/org/apache/ozhera/log/agent/channel/ChannelEngine.java
index e339b6a9..cf67339a 100644
--- 
a/ozhera-log/log-agent/src/main/java/org/apache/ozhera/log/agent/channel/ChannelEngine.java
+++ 
b/ozhera-log/log-agent/src/main/java/org/apache/ozhera/log/agent/channel/ChannelEngine.java
@@ -42,6 +42,8 @@ import 
org.apache.ozhera.log.agent.channel.locator.ChannelDefineLocator;
 import org.apache.ozhera.log.agent.channel.locator.ChannelDefineRpcLocator;
 import org.apache.ozhera.log.agent.channel.memory.AgentMemoryService;
 import org.apache.ozhera.log.agent.channel.memory.AgentMemoryServiceImpl;
+import org.apache.ozhera.log.agent.channel.pipeline.Pipeline;
+import org.apache.ozhera.log.agent.common.ChannelUtil;
 import org.apache.ozhera.log.agent.common.ExecutorUtil;
 import org.apache.ozhera.log.agent.export.MsgExporter;
 import org.apache.ozhera.log.agent.factory.OutPutServiceFactory;
@@ -89,8 +91,12 @@ public class ChannelEngine {
 
     private ChannelServiceFactory channelServiceFactory;
 
+    private Pipeline pipeline;
+
     private String memoryBasePath;
 
+    private Config config;
+
     private final Gson gson = GSON;
 
     private static final String PROGRESS_ENV_KEY = 
"CHANNEL_STATE_PERIOD_SECONDS";
@@ -112,7 +118,7 @@ public class ChannelEngine {
     public void init() {
         List<Long> failedChannelId = Lists.newArrayList();
         try {
-            Config config = Ioc.ins().getBean(Config.class.getName());
+            config = Ioc.ins().getBean(Config.class.getName());
             memoryBasePath = config.get("agent.memory.path", 
AgentMemoryService.DEFAULT_BASE_PATH);
             //talosProducerMap = new ConcurrentHashMap<>(512);
 
@@ -121,6 +127,7 @@ public class ChannelEngine {
             log.info("current agent all config meta:{}", 
gson.toJson(channelDefineList));
             agentMemoryService = new AgentMemoryServiceImpl(memoryBasePath);
             fileMonitorListener = new DefaultFileMonitorListener();
+            pipeline = new Pipeline();
 
             channelServiceFactory = new 
ChannelServiceFactory(agentMemoryService, memoryBasePath);
 
@@ -128,7 +135,7 @@ public class ChannelEngine {
             channelServiceList = channelDefineList.parallelStream()
                     .filter(channelDefine -> 
filterCollStart(channelDefine.getAppName()))
                     .map(channelDefine -> {
-                        ChannelService channelService = 
this.channelServiceTrans(channelDefine);
+                        ChannelService channelService = 
this.channelServiceTrans(channelDefine, pipeline);
                         if (null == channelService) {
                             failedChannelId.add(channelDefine.getChannelId());
                         }
@@ -155,10 +162,7 @@ public class ChannelEngine {
     }
 
     private void resolveCompressEnabled() {
-        String raw = System.getenv(COMPRESS_KEY);
-        if (StringUtils.isBlank(raw)) {
-            raw = System.getProperty(COMPRESS_KEY);
-        }
+        String raw = ChannelUtil.getConfig(COMPRESS_KEY, config);
         if (StringUtils.isNotBlank(raw)) {
             try {
                 progressCompressValue = Boolean.parseBoolean(raw);
@@ -237,11 +241,7 @@ public class ChannelEngine {
     }
 
     private long resolvePeriodSeconds() {
-        String raw = System.getenv(PROGRESS_ENV_KEY);
-        if (StringUtils.isBlank(raw)) {
-            raw = System.getProperty(PROGRESS_ENV_KEY);
-        }
-
+        String raw = ChannelUtil.getConfig(PROGRESS_ENV_KEY, config);
         if (StringUtils.isBlank(raw)) {
             return DEFAULT_PERIOD_SECONDS;
         }
@@ -301,7 +301,7 @@ public class ChannelEngine {
         }));
     }
 
-    private ChannelService channelServiceTrans(ChannelDefine channelDefine) {
+    private ChannelService channelServiceTrans(ChannelDefine channelDefine, 
Pipeline pipeline) {
         try {
             preCheckChannelDefine(channelDefine);
             Output output = channelDefine.getOutput();
@@ -316,7 +316,7 @@ public class ChannelEngine {
                 agentMemoryService = new 
AgentMemoryServiceImpl(org.apache.ozhera.log.common.Config.ins().get("agent.memory.path",
 AgentMemoryService.DEFAULT_BASE_PATH));
             }
             log.info("channelServiceTrans,channelDefine,channelId:{}", 
channelDefine.getChannelId());
-            return channelServiceFactory.createChannelService(channelDefine, 
exporter, filterChain);
+            return channelServiceFactory.createChannelService(channelDefine, 
exporter, filterChain, pipeline);
         } catch (Throwable e) {
             log.error("channelServiceTrans exception, channelDefine:{}", 
gson.toJson(channelDefine), e);
         }
@@ -626,7 +626,7 @@ public class ChannelEngine {
     private List<ChannelDefine> intersection(List<ChannelDefine> origin, 
List<ChannelDefine> source) {
         List<Long> sourceIds = Lists.newArrayList();
         if (CollectionUtils.isNotEmpty(source)) {
-            sourceIds = 
source.stream().map(ChannelDefine::getChannelId).collect(Collectors.toList());
+            sourceIds = 
source.stream().map(ChannelDefine::getChannelId).toList();
         }
         List<Long> finalSourceIds = sourceIds;
         return origin.stream().filter(channelDefine -> 
finalSourceIds.contains(channelDefine.getChannelId()) && 
OperateEnum.DELETE_OPERATE != 
channelDefine.getOperateEnum()).collect(Collectors.toList());
@@ -643,7 +643,7 @@ public class ChannelEngine {
                 .filter(Objects::nonNull)
                 .filter(channelDefine -> 
filterCollStart(channelDefine.getAppName()))
                 .map(channelDefine -> {
-                    ChannelService channelService = 
channelServiceTrans(channelDefine);
+                    ChannelService channelService = 
channelServiceTrans(channelDefine, pipeline);
                     if (null == channelService) {
                         failedChannelId.add(channelDefine.getChannelId());
                     }
diff --git 
a/ozhera-log/log-agent/src/main/java/org/apache/ozhera/log/agent/channel/ChannelServiceFactory.java
 
b/ozhera-log/log-agent/src/main/java/org/apache/ozhera/log/agent/channel/ChannelServiceFactory.java
index 6d2f042e..ab9ac759 100644
--- 
a/ozhera-log/log-agent/src/main/java/org/apache/ozhera/log/agent/channel/ChannelServiceFactory.java
+++ 
b/ozhera-log/log-agent/src/main/java/org/apache/ozhera/log/agent/channel/ChannelServiceFactory.java
@@ -23,6 +23,7 @@ import com.google.common.collect.Lists;
 import org.apache.commons.collections.CollectionUtils;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.ozhera.log.agent.channel.memory.AgentMemoryService;
+import org.apache.ozhera.log.agent.channel.pipeline.Pipeline;
 import org.apache.ozhera.log.agent.export.MsgExporter;
 import org.apache.ozhera.log.agent.filter.FilterChain;
 import org.apache.ozhera.log.agent.input.Input;
@@ -30,7 +31,6 @@ import org.apache.ozhera.log.api.enums.LogTypeEnum;
 
 import java.util.Arrays;
 import java.util.List;
-import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 
 import static org.apache.ozhera.log.common.Constant.SYMBOL_COMMA;
@@ -67,8 +67,8 @@ public class ChannelServiceFactory {
         return CollectionUtils.isNotEmpty(multiSpecialFileSuffix) && 
logPattern.contains("*") && 
multiSpecialFileSuffix.stream().anyMatch(logPattern::endsWith);
     }
 
-    public ChannelService createChannelService(ChannelDefine channelDefine,
-                                               MsgExporter exporter, 
FilterChain filterChain) {
+    public ChannelService createChannelService(ChannelDefine channelDefine, 
MsgExporter exporter,
+                                               FilterChain filterChain, 
Pipeline pipeline) {
         if (channelDefine == null || channelDefine.getInput() == null) {
             throw new IllegalArgumentException("Channel define or input cannot 
be null");
         }
@@ -78,18 +78,18 @@ public class ChannelServiceFactory {
         String logPattern = input.getLogPattern();
 
         if (isSpecialFilePath(logPattern)) {
-            return createStandardChannelService(exporter, channelDefine, 
filterChain);
+            return createStandardChannelService(exporter, channelDefine, 
filterChain, pipeline);
         }
 
         if (LogTypeEnum.OPENTELEMETRY == LogTypeEnum.name2enum(logType) || 
FileUtil.exist(logPattern)) {
-            return createStandardChannelService(exporter, channelDefine, 
filterChain);
+            return createStandardChannelService(exporter, channelDefine, 
filterChain, pipeline);
         }
 
         if (shouldUseWildcardService(logPattern)) {
-            return createWildcardChannelService(exporter, channelDefine, 
filterChain);
+            return createWildcardChannelService(exporter, channelDefine, 
filterChain, pipeline);
         }
 
-        return createStandardChannelService(exporter, channelDefine, 
filterChain);
+        return createStandardChannelService(exporter, channelDefine, 
filterChain, pipeline);
     }
 
     private boolean shouldUseWildcardService(String logPattern) {
@@ -134,15 +134,15 @@ public class ChannelServiceFactory {
         return false;
     }
 
-    private ChannelService createStandardChannelService(MsgExporter exporter,
-                                                        ChannelDefine 
channelDefine, FilterChain filterChain) {
+    private ChannelService createStandardChannelService(MsgExporter exporter, 
ChannelDefine channelDefine,
+                                                        FilterChain 
filterChain, Pipeline pipeline) {
         return new ChannelServiceImpl(exporter, agentMemoryService,
-                channelDefine, filterChain);
+                channelDefine, filterChain, pipeline);
     }
 
-    private ChannelService createWildcardChannelService(MsgExporter exporter,
-                                                        ChannelDefine 
channelDefine, FilterChain filterChain) {
+    private ChannelService createWildcardChannelService(MsgExporter exporter, 
ChannelDefine channelDefine,
+                                                        FilterChain 
filterChain, Pipeline pipeline) {
         return new WildcardChannelServiceImpl(exporter, agentMemoryService,
-                channelDefine, filterChain, memoryBasePath);
+                channelDefine, filterChain, memoryBasePath, pipeline);
     }
 }
diff --git 
a/ozhera-log/log-agent/src/main/java/org/apache/ozhera/log/agent/channel/ChannelServiceImpl.java
 
b/ozhera-log/log-agent/src/main/java/org/apache/ozhera/log/agent/channel/ChannelServiceImpl.java
index d4eb0dae..6611c0eb 100644
--- 
a/ozhera-log/log-agent/src/main/java/org/apache/ozhera/log/agent/channel/ChannelServiceImpl.java
+++ 
b/ozhera-log/log-agent/src/main/java/org/apache/ozhera/log/agent/channel/ChannelServiceImpl.java
@@ -33,6 +33,8 @@ import 
org.apache.ozhera.log.agent.channel.file.InodeFileComparator;
 import org.apache.ozhera.log.agent.channel.file.MonitorFile;
 import org.apache.ozhera.log.agent.channel.memory.AgentMemoryService;
 import org.apache.ozhera.log.agent.channel.memory.ChannelMemory;
+import org.apache.ozhera.log.agent.channel.pipeline.Pipeline;
+import org.apache.ozhera.log.agent.channel.pipeline.RequestContext;
 import org.apache.ozhera.log.agent.common.ChannelUtil;
 import org.apache.ozhera.log.agent.common.ExecutorUtil;
 import org.apache.ozhera.log.agent.export.MsgExporter;
@@ -131,12 +133,17 @@ public class ChannelServiceImpl extends 
AbstractChannelService {
 
     private String linePrefix;
 
-    public ChannelServiceImpl(MsgExporter msgExporter, AgentMemoryService 
memoryService, ChannelDefine channelDefine, FilterChain chain) {
+
+    private Pipeline pipeline;
+
+    public ChannelServiceImpl(MsgExporter msgExporter, AgentMemoryService 
memoryService,
+                              ChannelDefine channelDefine, FilterChain chain, 
Pipeline pipeline) {
         this.memoryService = memoryService;
         this.msgExporter = msgExporter;
         this.channelDefine = channelDefine;
         this.chain = chain;
         this.monitorFileList = Lists.newArrayList();
+        this.pipeline = pipeline;
     }
 
     @Override
@@ -263,7 +270,7 @@ public class ChannelServiceImpl extends 
AbstractChannelService {
                     cleanFile(path::equals);
                     continue;
                 }
-                
+
                 if (ChannelUtil.isInodeChanged(channelMemory, path)) {
                     Long[] inodeInfo = ChannelUtil.getInodeInfo(channelMemory, 
path);
                     log.info("deleted file cleanup trigger for inode changed 
path:{}, oldInode:{}, newInode:{}",
@@ -470,6 +477,10 @@ public class ChannelServiceImpl extends 
AbstractChannelService {
     }
 
     private void wrapDataToSend(String lineMsg, AtomicReference<ReadResult> 
readResult, String pattern, String patternCode, long ct) {
+        RequestContext requestContext = 
RequestContext.builder().channelDefine(channelDefine).readResult(readResult.get()).lineMsg(lineMsg).build();
+        if (pipeline.invoke(requestContext)) {
+            return;
+        }
         LineMessage lineMessage = createLineMessage(lineMsg, readResult, 
pattern, patternCode, ct);
 
         updateChannelMemory(channelMemory, pattern, logTypeEnum, ct, 
readResult);
@@ -665,7 +676,7 @@ public class ChannelServiceImpl extends 
AbstractChannelService {
                             filePath, gson.toJson(memoryUnixFileNode), 
gson.toJson(currentUnixFileNode));
                 }
             }
-            
+
             // Check if pointer exceeds file length (file may have been 
truncated but inode unchanged)
             // This handles the case where log rotation truncates the file 
without changing inode
             java.io.File file = new java.io.File(filePath);
diff --git 
a/ozhera-log/log-agent/src/main/java/org/apache/ozhera/log/agent/channel/WildcardChannelServiceImpl.java
 
b/ozhera-log/log-agent/src/main/java/org/apache/ozhera/log/agent/channel/WildcardChannelServiceImpl.java
index b1778881..ef62b742 100644
--- 
a/ozhera-log/log-agent/src/main/java/org/apache/ozhera/log/agent/channel/WildcardChannelServiceImpl.java
+++ 
b/ozhera-log/log-agent/src/main/java/org/apache/ozhera/log/agent/channel/WildcardChannelServiceImpl.java
@@ -35,6 +35,8 @@ import org.apache.commons.lang3.StringUtils;
 import org.apache.ozhera.log.agent.channel.file.MonitorFile;
 import org.apache.ozhera.log.agent.channel.memory.AgentMemoryService;
 import org.apache.ozhera.log.agent.channel.memory.ChannelMemory;
+import org.apache.ozhera.log.agent.channel.pipeline.Pipeline;
+import org.apache.ozhera.log.agent.channel.pipeline.RequestContext;
 import org.apache.ozhera.log.agent.common.ChannelUtil;
 import org.apache.ozhera.log.agent.common.ExecutorUtil;
 import org.apache.ozhera.log.agent.export.MsgExporter;
@@ -49,11 +51,7 @@ import java.io.File;
 import java.io.IOException;
 import java.time.Instant;
 import java.util.*;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.Future;
-import java.util.concurrent.ScheduledFuture;
-import java.util.concurrent.TimeUnit;
+import java.util.concurrent.*;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.concurrent.locks.ReentrantLock;
 import java.util.regex.Pattern;
@@ -115,13 +113,17 @@ public class WildcardChannelServiceImpl extends 
AbstractChannelService {
     private final Map<String, Long> fileTruncationCheckMap = new 
ConcurrentHashMap<>();
 
 
+    private Pipeline pipeline;
+
     public WildcardChannelServiceImpl(MsgExporter msgExporter, 
AgentMemoryService memoryService,
-                                      ChannelDefine channelDefine, FilterChain 
chain, String memoryBasePath) {
+                                      ChannelDefine channelDefine, FilterChain 
chain,
+                                      String memoryBasePath, Pipeline 
pipeline) {
         this.memoryService = memoryService;
         this.msgExporter = msgExporter;
         this.channelDefine = channelDefine;
         this.chain = chain;
         this.memoryBasePath = memoryBasePath;
+        this.pipeline = pipeline;
     }
 
     @Override
@@ -278,24 +280,24 @@ public class WildcardChannelServiceImpl extends 
AbstractChannelService {
         try {
             ConcurrentHashMap<String, com.xiaomi.mone.file.ozhera.HeraFile> 
fileMap = fileMonitor.getFileMap();
             ConcurrentHashMap<Object, com.xiaomi.mone.file.ozhera.HeraFile> 
map = fileMonitor.getMap();
-            
+
             List<ReadListener> listeners = 
defaultMonitorListener.getReadListenerList();
-            
+
             for (ReadListener readListener : listeners) {
                 if (readListener instanceof 
com.xiaomi.mone.file.listener.OzHeraReadListener) {
-                    com.xiaomi.mone.file.listener.OzHeraReadListener 
ozHeraReadListener = 
+                    com.xiaomi.mone.file.listener.OzHeraReadListener 
ozHeraReadListener =
                             (com.xiaomi.mone.file.listener.OzHeraReadListener) 
readListener;
                     com.xiaomi.mone.file.LogFile2 logFile = 
ozHeraReadListener.getLogFile();
                     if (logFile == null) {
                         continue;
                     }
-                    
+
                     String filePath = logFile.getFile();
                     Object logFileKey = logFile.getFileKey();
-                    
+
                     // Check if file path still exists in fileMap (may be new 
file with same path but different inode)
                     com.xiaomi.mone.file.ozhera.HeraFile currentHeraFile = 
fileMap.get(filePath);
-                    
+
                     // Shutdown old file handle if inode changed (same path, 
different inode)
                     if (currentHeraFile != null && !Objects.equals(logFileKey, 
currentHeraFile.getFileKey())) {
                         log.info("inode changed for path:{}, oldInode:{}, 
newInode:{}, shutting down old file handle",
@@ -442,6 +444,10 @@ public class WildcardChannelServiceImpl extends 
AbstractChannelService {
     }
 
     private void wrapDataToSend(String lineMsg, AtomicReference<ReadResult> 
readResult, String patternCode, long ct) {
+        RequestContext requestContext = 
RequestContext.builder().channelDefine(channelDefine).readResult(readResult.get()).lineMsg(lineMsg).build();
+        if (pipeline.invoke(requestContext)) {
+            return;
+        }
         String filePathName = readResult.get().getFilePathName();
         LineMessage lineMessage = createLineMessage(lineMsg, readResult, 
filePathName, patternCode, ct);
         updateChannelMemory(channelMemory, filePathName, getLogTypeEnum(), ct, 
readResult);
diff --git 
a/ozhera-log/log-agent/src/main/java/org/apache/ozhera/log/agent/channel/pipeline/Pipeline.java
 
b/ozhera-log/log-agent/src/main/java/org/apache/ozhera/log/agent/channel/pipeline/Pipeline.java
new file mode 100644
index 00000000..511496a5
--- /dev/null
+++ 
b/ozhera-log/log-agent/src/main/java/org/apache/ozhera/log/agent/channel/pipeline/Pipeline.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.ozhera.log.agent.channel.pipeline;
+
+import lombok.extern.slf4j.Slf4j;
+
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.List;
+import java.util.ServiceLoader;
+
+import static org.apache.ozhera.log.common.Constant.GSON;
+
+/**
+ * @author wtt
+ * @date 2025/11/21 15:07
+ * @version 1.0
+ */
+@Slf4j
+public class Pipeline {
+    private final List<Valve> valves = new ArrayList<>();
+
+    public Pipeline() {
+        ServiceLoader<Valve> loader = ServiceLoader.load(Valve.class);
+        for (Valve valve : loader) {
+            valves.add(valve);
+        }
+        valves.sort(Comparator.naturalOrder());
+        log.info("Pipeline valves: {}", GSON.toJson(valves));
+    }
+
+
+    public void removeValve(Valve valve) {
+        valves.remove(valve);
+    }
+
+    public boolean invoke(RequestContext ctx) {
+        for (Valve valve : valves) {
+            if (!valve.shouldExecute(ctx)) {
+                continue;
+            }
+            boolean shouldContinue = valve.invoke(ctx);
+            if (!shouldContinue) {
+                return false;
+            }
+        }
+        return true;
+    }
+}
diff --git 
a/ozhera-log/log-agent/src/main/java/org/apache/ozhera/log/agent/channel/pipeline/RequestContext.java
 
b/ozhera-log/log-agent/src/main/java/org/apache/ozhera/log/agent/channel/pipeline/RequestContext.java
new file mode 100644
index 00000000..07b93765
--- /dev/null
+++ 
b/ozhera-log/log-agent/src/main/java/org/apache/ozhera/log/agent/channel/pipeline/RequestContext.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.ozhera.log.agent.channel.pipeline;
+
+import com.xiaomi.mone.file.ReadResult;
+import lombok.AllArgsConstructor;
+import lombok.Builder;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+import org.apache.ozhera.log.agent.channel.ChannelDefine;
+
+/**
+ * @author wtt
+ * @date 2025/11/21 15:09
+ * @version 1.0
+ */
+@Data
+@NoArgsConstructor
+@AllArgsConstructor
+@Builder
+public class RequestContext {
+    private ChannelDefine channelDefine;
+    private ReadResult readResult;
+    private String lineMsg;
+}
diff --git 
a/ozhera-log/log-agent/src/main/java/org/apache/ozhera/log/agent/channel/pipeline/Valve.java
 
b/ozhera-log/log-agent/src/main/java/org/apache/ozhera/log/agent/channel/pipeline/Valve.java
new file mode 100644
index 00000000..5c02d4a5
--- /dev/null
+++ 
b/ozhera-log/log-agent/src/main/java/org/apache/ozhera/log/agent/channel/pipeline/Valve.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.ozhera.log.agent.channel.pipeline;
+
+/**
+ * @author wtt
+ * @date 2025/11/21 15:08
+ * @version 1.0
+ */
+public interface Valve extends Comparable<Valve> {
+
+    boolean invoke(RequestContext ctx);
+
+    default boolean shouldExecute(RequestContext ctx) {
+        return true;
+    }
+
+    int getOrder();
+
+    @Override
+    default int compareTo(Valve o) {
+        return Integer.compare(this.getOrder(), o.getOrder());
+    }
+}
diff --git 
a/ozhera-log/log-agent/src/main/java/org/apache/ozhera/log/agent/channel/pipeline/package-info.java
 
b/ozhera-log/log-agent/src/main/java/org/apache/ozhera/log/agent/channel/pipeline/package-info.java
new file mode 100644
index 00000000..bf7162ab
--- /dev/null
+++ 
b/ozhera-log/log-agent/src/main/java/org/apache/ozhera/log/agent/channel/pipeline/package-info.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/**
+ * @author wtt
+ * @date 2025/11/21 15:04
+ * @version 1.0
+ *
+ * 日志过滤器链,这个过滤器使用了pipeline模式,参考了tomcat的过滤器链,通过SPI机制,
+ * 将日志过滤器加入到过滤器链中,并实现日志过滤器接口,实现日志过滤逻辑
+ */
+package org.apache.ozhera.log.agent.channel.pipeline;
\ No newline at end of file
diff --git 
a/ozhera-log/log-agent/src/main/java/org/apache/ozhera/log/agent/common/ChannelUtil.java
 
b/ozhera-log/log-agent/src/main/java/org/apache/ozhera/log/agent/common/ChannelUtil.java
index ac7d60fa..33896787 100644
--- 
a/ozhera-log/log-agent/src/main/java/org/apache/ozhera/log/agent/common/ChannelUtil.java
+++ 
b/ozhera-log/log-agent/src/main/java/org/apache/ozhera/log/agent/common/ChannelUtil.java
@@ -19,9 +19,10 @@
 package org.apache.ozhera.log.agent.common;
 
 import com.google.common.collect.Lists;
-import org.apache.ozhera.log.agent.channel.memory.ChannelMemory;
+import com.xiaomi.youpin.docean.plugin.config.Config;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.lang3.StringUtils;
+import org.apache.ozhera.log.agent.channel.memory.ChannelMemory;
 
 import java.io.File;
 import java.nio.file.Files;
@@ -112,15 +113,15 @@ public class ChannelUtil {
         if (channelMemory == null || channelMemory.getFileProgressMap() == 
null) {
             return null;
         }
-        
+
         ChannelMemory.FileProgress fileProgress = 
channelMemory.getFileProgressMap().get(filePath);
         if (fileProgress == null || fileProgress.getUnixFileNode() == null) {
             return null;
         }
-        
+
         ChannelMemory.UnixFileNode memoryInode = 
fileProgress.getUnixFileNode();
         ChannelMemory.UnixFileNode currentInode = buildUnixFileNode(filePath);
-        
+
         return new ChannelMemory.UnixFileNode[]{memoryInode, currentInode};
     }
 
@@ -136,12 +137,12 @@ public class ChannelUtil {
         if (inodePair == null) {
             return false;
         }
-        
+
         ChannelMemory.UnixFileNode memoryInode = inodePair[0];
         ChannelMemory.UnixFileNode currentInode = inodePair[1];
-        
+
         return memoryInode.getSt_ino() != null && currentInode.getSt_ino() != 
null &&
-               !java.util.Objects.equals(memoryInode.getSt_ino(), 
currentInode.getSt_ino());
+                !java.util.Objects.equals(memoryInode.getSt_ino(), 
currentInode.getSt_ino());
     }
 
     /**
@@ -156,10 +157,10 @@ public class ChannelUtil {
         if (inodePair == null) {
             return null;
         }
-        
+
         ChannelMemory.UnixFileNode memoryInode = inodePair[0];
         ChannelMemory.UnixFileNode currentInode = inodePair[1];
-        
+
         if (memoryInode.getSt_ino() != null && currentInode.getSt_ino() != 
null) {
             return new Long[]{memoryInode.getSt_ino(), 
currentInode.getSt_ino()};
         }
@@ -181,4 +182,21 @@ public class ChannelUtil {
         return count;
     }
 
+    /**
+     * 获取配置,先从环境变量中获取,再从系统属性中获取,最后从配置文件中获取
+     * @param key 配置的key
+     * @param config 配置对象
+     * @return 配置的值
+     */
+    public static String getConfig(String key, Config config) {
+        String raw = System.getenv(key);
+        if (StringUtils.isBlank(raw)) {
+            raw = System.getProperty(key);
+        }
+        if (StringUtils.isBlank(raw) && null != config) {
+            raw = config.get(key, "");
+        }
+        return raw;
+    }
+
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to