This is an automated email from the ASF dual-hosted git repository.

gaoxihui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozhera.git


The following commit(s) were added to refs/heads/master by this push:
     new 86b5eea3 feat: support configuring compression and export cycles - 
added environment variables (#611)
86b5eea3 is described below

commit 86b5eea3214d8a40e7936e45ad280c871ce3530e
Author: wtt <[email protected]>
AuthorDate: Wed Nov 12 16:07:23 2025 +0800

    feat: support configuring compression and export cycles - added environment 
variables (#611)
---
 ozhera-log/log-agent-server/pom.xml                |  3 +-
 .../service/DefaultPublishConfigService.java       | 33 ++++++++++++--
 ozhera-log/log-agent/pom.xml                       |  3 +-
 .../ozhera/log/agent/channel/ChannelEngine.java    | 53 +++++++++++++++++++++-
 .../log/agent/rpc/processor/LogProcessor.java      |  2 +-
 .../apache/ozhera/log/agent/rpc/task/PingTask.java |  4 +-
 ozhera-log/log-common/pom.xml                      |  2 +-
 .../apache/ozhera/log/parse/PlaceholderParser.java |  8 +++-
 8 files changed, 95 insertions(+), 13 deletions(-)

diff --git a/ozhera-log/log-agent-server/pom.xml 
b/ozhera-log/log-agent-server/pom.xml
index a60334ba..5c1f9796 100644
--- a/ozhera-log/log-agent-server/pom.xml
+++ b/ozhera-log/log-agent-server/pom.xml
@@ -26,7 +26,7 @@ http://www.apache.org/licenses/LICENSE-2.0
         <version>2.2.6-SNAPSHOT</version>
     </parent>
     <modelVersion>4.0.0</modelVersion>
-    <version>2.2.7-SNAPSHOT</version>
+    <version>2.2.8-SNAPSHOT</version>
 
     <artifactId>log-agent-server</artifactId>
 
@@ -66,6 +66,7 @@ http://www.apache.org/licenses/LICENSE-2.0
         <dependency>
             <groupId>run.mone</groupId>
             <artifactId>rpc</artifactId>
+            <version>1.6.4-jdk21-SNAPSHOT</version>
             <exclusions>
                 <exclusion>
                     <groupId>ch.qos.logback</groupId>
diff --git 
a/ozhera-log/log-agent-server/src/main/java/org/apache/ozhera/log/server/service/DefaultPublishConfigService.java
 
b/ozhera-log/log-agent-server/src/main/java/org/apache/ozhera/log/server/service/DefaultPublishConfigService.java
index e642988a..ff97e622 100644
--- 
a/ozhera-log/log-agent-server/src/main/java/org/apache/ozhera/log/server/service/DefaultPublishConfigService.java
+++ 
b/ozhera-log/log-agent-server/src/main/java/org/apache/ozhera/log/server/service/DefaultPublishConfigService.java
@@ -24,14 +24,14 @@ import com.xiaomi.data.push.context.AgentContext;
 import com.xiaomi.data.push.rpc.RpcServer;
 import com.xiaomi.data.push.rpc.netty.AgentChannel;
 import com.xiaomi.data.push.rpc.protocol.RemotingCommand;
-import org.apache.ozhera.log.api.model.meta.LogCollectMeta;
-import org.apache.ozhera.log.api.model.vo.LogCmd;
-import org.apache.ozhera.log.api.service.PublishConfigService;
-import org.apache.ozhera.log.utils.NetUtil;
 import com.xiaomi.youpin.docean.plugin.dubbo.anno.Service;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.collections4.CollectionUtils;
 import org.apache.commons.lang3.StringUtils;
+import org.apache.ozhera.log.api.model.meta.LogCollectMeta;
+import org.apache.ozhera.log.api.model.vo.LogCmd;
+import org.apache.ozhera.log.api.service.PublishConfigService;
+import org.apache.ozhera.log.utils.NetUtil;
 
 import javax.annotation.Resource;
 import java.util.*;
@@ -56,6 +56,26 @@ public class DefaultPublishConfigService implements 
PublishConfigService {
     @Resource
     private RpcServer rpcServer;
 
+    private static final String CONFIG_COMPRESS_KEY = 
"CONFIG_COMPRESS_ENABLED";
+
+    private volatile boolean configCompressValue = false;
+
+    public void init() {
+        String raw = System.getenv(CONFIG_COMPRESS_KEY);
+        if (StringUtils.isBlank(raw)) {
+            raw = System.getProperty(CONFIG_COMPRESS_KEY);
+        }
+        if (StringUtils.isNotBlank(raw)) {
+            try {
+                configCompressValue = Boolean.parseBoolean(raw);
+                log.info("configCompressValue {}", configCompressValue);
+            } catch (Exception e) {
+                log.error("parse {} error,use default value:{},config 
value:{}", CONFIG_COMPRESS_KEY, configCompressValue, raw);
+            }
+        }
+    }
+
+
     /**
      * dubbo interface, the timeout period cannot be too long
      *
@@ -73,6 +93,11 @@ public class DefaultPublishConfigService implements 
PublishConfigService {
                 if 
(CollectionUtils.isNotEmpty(logCollectMeta.getAppLogMetaList())) {
                     RemotingCommand req = 
RemotingCommand.createRequestCommand(LogCmd.LOG_REQ);
                     req.setBody(sendStr.getBytes());
+
+                    if (configCompressValue) {
+                        req.enableCompression();
+                    }
+
                     log.info("Send the configuration,agent ip:{},Configuration 
information:{}", agentCurrentIp, sendStr);
                     Stopwatch started = Stopwatch.createStarted();
                     RemotingCommand res = 
rpcServer.sendMessage(logAgentMap.get(agentCurrentIp), req, 10000);
diff --git a/ozhera-log/log-agent/pom.xml b/ozhera-log/log-agent/pom.xml
index 617f62b4..5eed46e5 100644
--- a/ozhera-log/log-agent/pom.xml
+++ b/ozhera-log/log-agent/pom.xml
@@ -28,7 +28,7 @@ http://www.apache.org/licenses/LICENSE-2.0
     <modelVersion>4.0.0</modelVersion>
 
     <artifactId>log-agent</artifactId>
-    <version>2.2.10-SNAPSHOT</version>
+    <version>2.2.11-SNAPSHOT</version>
 
     <properties>
         <maven.compiler.source>21</maven.compiler.source>
@@ -45,6 +45,7 @@ http://www.apache.org/licenses/LICENSE-2.0
         <dependency>
             <groupId>run.mone</groupId>
             <artifactId>rpc</artifactId>
+            <version>1.6.4-jdk21-SNAPSHOT</version>
             <exclusions>
                 <exclusion>
                     <groupId>ch.qos.logback</groupId>
diff --git 
a/ozhera-log/log-agent/src/main/java/org/apache/ozhera/log/agent/channel/ChannelEngine.java
 
b/ozhera-log/log-agent/src/main/java/org/apache/ozhera/log/agent/channel/ChannelEngine.java
index 1d29f573..e339b6a9 100644
--- 
a/ozhera-log/log-agent/src/main/java/org/apache/ozhera/log/agent/channel/ChannelEngine.java
+++ 
b/ozhera-log/log-agent/src/main/java/org/apache/ozhera/log/agent/channel/ChannelEngine.java
@@ -93,6 +93,14 @@ public class ChannelEngine {
 
     private final Gson gson = GSON;
 
+    private static final String PROGRESS_ENV_KEY = 
"CHANNEL_STATE_PERIOD_SECONDS";
+
+    private static final String COMPRESS_KEY = 
"CHANNEL_STATE_COMPRESS_ENABLED";
+
+    private static final long DEFAULT_PERIOD_SECONDS = 10L;
+
+    private volatile boolean progressCompressValue = false;
+
     @Getter
     private volatile boolean initComplete;
 
@@ -138,6 +146,7 @@ public class ChannelEngine {
             log.info("current channelDefineList:{},current 
channelServiceList:{}", gson.toJson(this.channelDefineList), 
gson.toJson(this.channelServiceList.stream().map(ChannelService::instanceId).collect(Collectors.toList())));
             monitorFilesClean();
             executorFileClean();
+            resolveCompressEnabled();
         } catch (Exception e) {
             log.error("ChannelEngine init exception", e);
         } finally {
@@ -145,6 +154,21 @@ public class ChannelEngine {
         }
     }
 
+    private void resolveCompressEnabled() {
+        String raw = System.getenv(COMPRESS_KEY);
+        if (StringUtils.isBlank(raw)) {
+            raw = System.getProperty(COMPRESS_KEY);
+        }
+        if (StringUtils.isNotBlank(raw)) {
+            try {
+                progressCompressValue = Boolean.parseBoolean(raw);
+                log.info("progressCompressValue {}", progressCompressValue);
+            } catch (Exception e) {
+                log.error("parse {} error,use default value:{},config 
value:{}", COMPRESS_KEY, progressCompressValue, raw);
+            }
+        }
+    }
+
     /**
      * Thread pool cleaning, many wasted files don't need to keep wasting 
threads, they should be cleaned up directly.
      */
@@ -200,13 +224,34 @@ public class ChannelEngine {
     }
 
     private void exportChannelState() {
+        final long period = resolvePeriodSeconds();
+        log.info("exportChannelState schedule period = {}s", period);
+
         ExecutorUtil.scheduleAtFixedRate(() -> {
             SafeRun.run(() -> {
                 List<ChannelState> channelStateList = 
channelServiceList.stream().map(c -> c.state()).collect(Collectors.toList());
                 // Send the collection progress
                 sendCollectionProgress(channelStateList);
             });
-        }, 10, 10, TimeUnit.SECONDS);
+        }, 10, period, TimeUnit.SECONDS);
+    }
+
+    private long resolvePeriodSeconds() {
+        String raw = System.getenv(PROGRESS_ENV_KEY);
+        if (StringUtils.isBlank(raw)) {
+            raw = System.getProperty(PROGRESS_ENV_KEY);
+        }
+
+        if (StringUtils.isBlank(raw)) {
+            return DEFAULT_PERIOD_SECONDS;
+        }
+
+        try {
+            return Long.parseLong(raw.trim());
+        } catch (NumberFormatException e) {
+            log.warn("Invalid {} value '{}', fallback to {}s", 
PROGRESS_ENV_KEY, raw, DEFAULT_PERIOD_SECONDS);
+            return DEFAULT_PERIOD_SECONDS;
+        }
     }
 
     private List<Long> channelStart(List<ChannelService> channelServiceList) {
@@ -634,6 +679,12 @@ public class ChannelEngine {
         UpdateLogProcessCmd processCmd = 
assembleLogProcessData(channelStateList);
         RpcClient rpcClient = Ioc.ins().getBean(RpcClient.class);
         RemotingCommand req = 
RemotingCommand.createRequestCommand(Constant.RPCCMD_AGENT_CODE);
+
+        if (progressCompressValue) {
+            // enable collection progress compression
+            req.enableCompression();
+        }
+
         req.setBody(GSON.toJson(processCmd).getBytes());
         rpcClient.sendToAllMessage(req);
         log.debug("send collect progress,data:{}", gson.toJson(processCmd));
diff --git 
a/ozhera-log/log-agent/src/main/java/org/apache/ozhera/log/agent/rpc/processor/LogProcessor.java
 
b/ozhera-log/log-agent/src/main/java/org/apache/ozhera/log/agent/rpc/processor/LogProcessor.java
index 37714d80..709e1259 100644
--- 
a/ozhera-log/log-agent/src/main/java/org/apache/ozhera/log/agent/rpc/processor/LogProcessor.java
+++ 
b/ozhera-log/log-agent/src/main/java/org/apache/ozhera/log/agent/rpc/processor/LogProcessor.java
@@ -50,7 +50,7 @@ public class LogProcessor implements NettyRequestProcessor {
     public RemotingCommand processRequest(ChannelHandlerContext 
channelHandlerContext, RemotingCommand remotingCommand) throws Exception {
         LogCollectMeta req = remotingCommand.getReq(LogCollectMeta.class);
 
-        log.info("logCollect config req:{}", GSON.toJson(req));
+        log.info("ca:{}", GSON.toJson(req));
 
         RemotingCommand response = 
RemotingCommand.createResponseCommand(LogCmd.LOG_REQ);
         response.setBody("ok".getBytes());
diff --git 
a/ozhera-log/log-agent/src/main/java/org/apache/ozhera/log/agent/rpc/task/PingTask.java
 
b/ozhera-log/log-agent/src/main/java/org/apache/ozhera/log/agent/rpc/task/PingTask.java
index 37ad68b1..9af684b4 100644
--- 
a/ozhera-log/log-agent/src/main/java/org/apache/ozhera/log/agent/rpc/task/PingTask.java
+++ 
b/ozhera-log/log-agent/src/main/java/org/apache/ozhera/log/agent/rpc/task/PingTask.java
@@ -61,9 +61,9 @@ public class PingTask extends Task {
                 String message = load.get() ? "ping:" + 
System.currentTimeMillis() : "load";
                 ping.setMessage(message);
 
-                RemotingCommand req = 
RemotingCommand.createRequestCommand(RpcCmd.pingReq);
-                req.setBody(GSON.toJson(ping).getBytes());
                 for (String service : client.getServerList().get()) {
+                    RemotingCommand req = 
RemotingCommand.createRequestCommand(RpcCmd.pingReq);
+                    req.setBody(GSON.toJson(ping).getBytes());
                     client.sendMessage(service, req, PingTask::handleResponse);
                 }
                 RpcClient.startLatch.countDown();
diff --git a/ozhera-log/log-common/pom.xml b/ozhera-log/log-common/pom.xml
index 8238f903..6e61f6d6 100644
--- a/ozhera-log/log-common/pom.xml
+++ b/ozhera-log/log-common/pom.xml
@@ -27,7 +27,7 @@ http://www.apache.org/licenses/LICENSE-2.0
     </parent>
 
     <artifactId>log-common</artifactId>
-    <version>2.2.7-SNAPSHOT</version>
+    <version>2.2.8-SNAPSHOT</version>
     <modelVersion>4.0.0</modelVersion>
 
     <properties>
diff --git 
a/ozhera-log/log-common/src/main/java/org/apache/ozhera/log/parse/PlaceholderParser.java
 
b/ozhera-log/log-common/src/main/java/org/apache/ozhera/log/parse/PlaceholderParser.java
index 86f213be..d1c9d670 100644
--- 
a/ozhera-log/log-common/src/main/java/org/apache/ozhera/log/parse/PlaceholderParser.java
+++ 
b/ozhera-log/log-common/src/main/java/org/apache/ozhera/log/parse/PlaceholderParser.java
@@ -163,8 +163,12 @@ public class PlaceholderParser extends AbstractLogParser {
                     }
 
                     // 丢弃字段
-                    if (!"-".equals(part.modifier) && fieldIndex < 
fieldNames.size()) {
-                        result.put(fieldNames.get(fieldIndex++), fieldValue);
+                    if (!"-".equals(part.modifier) && (fieldIndex < 
fieldNames.size() || fieldNames.isEmpty())) {
+                        if (!fieldNames.isEmpty()) {
+                            result.put(fieldNames.get(fieldIndex++), 
fieldValue);
+                        } else {
+                            result.put("field" + fieldIndex++, fieldValue);
+                        }
                     }
 
                 } else {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to