This is an automated email from the ASF dual-hosted git repository.
gaoxihui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozhera.git
The following commit(s) were added to refs/heads/master by this push:
new 86b5eea3 feat: support configuring compression and export cycles -
added environment variables (#611)
86b5eea3 is described below
commit 86b5eea3214d8a40e7936e45ad280c871ce3530e
Author: wtt <[email protected]>
AuthorDate: Wed Nov 12 16:07:23 2025 +0800
feat: support configuring compression and export cycles - added environment
variables (#611)
---
ozhera-log/log-agent-server/pom.xml | 3 +-
.../service/DefaultPublishConfigService.java | 33 ++++++++++++--
ozhera-log/log-agent/pom.xml | 3 +-
.../ozhera/log/agent/channel/ChannelEngine.java | 53 +++++++++++++++++++++-
.../log/agent/rpc/processor/LogProcessor.java | 2 +-
.../apache/ozhera/log/agent/rpc/task/PingTask.java | 4 +-
ozhera-log/log-common/pom.xml | 2 +-
.../apache/ozhera/log/parse/PlaceholderParser.java | 8 +++-
8 files changed, 95 insertions(+), 13 deletions(-)
diff --git a/ozhera-log/log-agent-server/pom.xml
b/ozhera-log/log-agent-server/pom.xml
index a60334ba..5c1f9796 100644
--- a/ozhera-log/log-agent-server/pom.xml
+++ b/ozhera-log/log-agent-server/pom.xml
@@ -26,7 +26,7 @@ http://www.apache.org/licenses/LICENSE-2.0
<version>2.2.6-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
- <version>2.2.7-SNAPSHOT</version>
+ <version>2.2.8-SNAPSHOT</version>
<artifactId>log-agent-server</artifactId>
@@ -66,6 +66,7 @@ http://www.apache.org/licenses/LICENSE-2.0
<dependency>
<groupId>run.mone</groupId>
<artifactId>rpc</artifactId>
+ <version>1.6.4-jdk21-SNAPSHOT</version>
<exclusions>
<exclusion>
<groupId>ch.qos.logback</groupId>
diff --git
a/ozhera-log/log-agent-server/src/main/java/org/apache/ozhera/log/server/service/DefaultPublishConfigService.java
b/ozhera-log/log-agent-server/src/main/java/org/apache/ozhera/log/server/service/DefaultPublishConfigService.java
index e642988a..ff97e622 100644
---
a/ozhera-log/log-agent-server/src/main/java/org/apache/ozhera/log/server/service/DefaultPublishConfigService.java
+++
b/ozhera-log/log-agent-server/src/main/java/org/apache/ozhera/log/server/service/DefaultPublishConfigService.java
@@ -24,14 +24,14 @@ import com.xiaomi.data.push.context.AgentContext;
import com.xiaomi.data.push.rpc.RpcServer;
import com.xiaomi.data.push.rpc.netty.AgentChannel;
import com.xiaomi.data.push.rpc.protocol.RemotingCommand;
-import org.apache.ozhera.log.api.model.meta.LogCollectMeta;
-import org.apache.ozhera.log.api.model.vo.LogCmd;
-import org.apache.ozhera.log.api.service.PublishConfigService;
-import org.apache.ozhera.log.utils.NetUtil;
import com.xiaomi.youpin.docean.plugin.dubbo.anno.Service;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
+import org.apache.ozhera.log.api.model.meta.LogCollectMeta;
+import org.apache.ozhera.log.api.model.vo.LogCmd;
+import org.apache.ozhera.log.api.service.PublishConfigService;
+import org.apache.ozhera.log.utils.NetUtil;
import javax.annotation.Resource;
import java.util.*;
@@ -56,6 +56,26 @@ public class DefaultPublishConfigService implements
PublishConfigService {
@Resource
private RpcServer rpcServer;
+ private static final String CONFIG_COMPRESS_KEY =
"CONFIG_COMPRESS_ENABLED";
+
+ private volatile boolean configCompressValue = false;
+
+ public void init() {
+ String raw = System.getenv(CONFIG_COMPRESS_KEY);
+ if (StringUtils.isBlank(raw)) {
+ raw = System.getProperty(CONFIG_COMPRESS_KEY);
+ }
+ if (StringUtils.isNotBlank(raw)) {
+ try {
+ configCompressValue = Boolean.parseBoolean(raw);
+ log.info("configCompressValue {}", configCompressValue);
+ } catch (Exception e) {
+ log.error("parse {} error,use default value:{},config
value:{}", CONFIG_COMPRESS_KEY, configCompressValue, raw);
+ }
+ }
+ }
+
+
/**
* dubbo interface, the timeout period cannot be too long
*
@@ -73,6 +93,11 @@ public class DefaultPublishConfigService implements
PublishConfigService {
if
(CollectionUtils.isNotEmpty(logCollectMeta.getAppLogMetaList())) {
RemotingCommand req =
RemotingCommand.createRequestCommand(LogCmd.LOG_REQ);
req.setBody(sendStr.getBytes());
+
+ if (configCompressValue) {
+ req.enableCompression();
+ }
+
log.info("Send the configuration,agent ip:{},Configuration
information:{}", agentCurrentIp, sendStr);
Stopwatch started = Stopwatch.createStarted();
RemotingCommand res =
rpcServer.sendMessage(logAgentMap.get(agentCurrentIp), req, 10000);
diff --git a/ozhera-log/log-agent/pom.xml b/ozhera-log/log-agent/pom.xml
index 617f62b4..5eed46e5 100644
--- a/ozhera-log/log-agent/pom.xml
+++ b/ozhera-log/log-agent/pom.xml
@@ -28,7 +28,7 @@ http://www.apache.org/licenses/LICENSE-2.0
<modelVersion>4.0.0</modelVersion>
<artifactId>log-agent</artifactId>
- <version>2.2.10-SNAPSHOT</version>
+ <version>2.2.11-SNAPSHOT</version>
<properties>
<maven.compiler.source>21</maven.compiler.source>
@@ -45,6 +45,7 @@ http://www.apache.org/licenses/LICENSE-2.0
<dependency>
<groupId>run.mone</groupId>
<artifactId>rpc</artifactId>
+ <version>1.6.4-jdk21-SNAPSHOT</version>
<exclusions>
<exclusion>
<groupId>ch.qos.logback</groupId>
diff --git
a/ozhera-log/log-agent/src/main/java/org/apache/ozhera/log/agent/channel/ChannelEngine.java
b/ozhera-log/log-agent/src/main/java/org/apache/ozhera/log/agent/channel/ChannelEngine.java
index 1d29f573..e339b6a9 100644
---
a/ozhera-log/log-agent/src/main/java/org/apache/ozhera/log/agent/channel/ChannelEngine.java
+++
b/ozhera-log/log-agent/src/main/java/org/apache/ozhera/log/agent/channel/ChannelEngine.java
@@ -93,6 +93,14 @@ public class ChannelEngine {
private final Gson gson = GSON;
+ private static final String PROGRESS_ENV_KEY =
"CHANNEL_STATE_PERIOD_SECONDS";
+
+ private static final String COMPRESS_KEY =
"CHANNEL_STATE_COMPRESS_ENABLED";
+
+ private static final long DEFAULT_PERIOD_SECONDS = 10L;
+
+ private volatile boolean progressCompressValue = false;
+
@Getter
private volatile boolean initComplete;
@@ -138,6 +146,7 @@ public class ChannelEngine {
log.info("current channelDefineList:{},current
channelServiceList:{}", gson.toJson(this.channelDefineList),
gson.toJson(this.channelServiceList.stream().map(ChannelService::instanceId).collect(Collectors.toList())));
monitorFilesClean();
executorFileClean();
+ resolveCompressEnabled();
} catch (Exception e) {
log.error("ChannelEngine init exception", e);
} finally {
@@ -145,6 +154,21 @@ public class ChannelEngine {
}
}
+ private void resolveCompressEnabled() {
+ String raw = System.getenv(COMPRESS_KEY);
+ if (StringUtils.isBlank(raw)) {
+ raw = System.getProperty(COMPRESS_KEY);
+ }
+ if (StringUtils.isNotBlank(raw)) {
+ try {
+ progressCompressValue = Boolean.parseBoolean(raw);
+ log.info("progressCompressValue {}", progressCompressValue);
+ } catch (Exception e) {
+ log.error("parse {} error,use default value:{},config
value:{}", COMPRESS_KEY, progressCompressValue, raw);
+ }
+ }
+ }
+
/**
* Thread pool cleaning, many wasted files don't need to keep wasting
threads, they should be cleaned up directly.
*/
@@ -200,13 +224,34 @@ public class ChannelEngine {
}
private void exportChannelState() {
+ final long period = resolvePeriodSeconds();
+ log.info("exportChannelState schedule period = {}s", period);
+
ExecutorUtil.scheduleAtFixedRate(() -> {
SafeRun.run(() -> {
List<ChannelState> channelStateList =
channelServiceList.stream().map(c -> c.state()).collect(Collectors.toList());
// Send the collection progress
sendCollectionProgress(channelStateList);
});
- }, 10, 10, TimeUnit.SECONDS);
+ }, 10, period, TimeUnit.SECONDS);
+ }
+
+ private long resolvePeriodSeconds() {
+ String raw = System.getenv(PROGRESS_ENV_KEY);
+ if (StringUtils.isBlank(raw)) {
+ raw = System.getProperty(PROGRESS_ENV_KEY);
+ }
+
+ if (StringUtils.isBlank(raw)) {
+ return DEFAULT_PERIOD_SECONDS;
+ }
+
+ try {
+ return Long.parseLong(raw.trim());
+ } catch (NumberFormatException e) {
+ log.warn("Invalid {} value '{}', fallback to {}s",
PROGRESS_ENV_KEY, raw, DEFAULT_PERIOD_SECONDS);
+ return DEFAULT_PERIOD_SECONDS;
+ }
}
private List<Long> channelStart(List<ChannelService> channelServiceList) {
@@ -634,6 +679,12 @@ public class ChannelEngine {
UpdateLogProcessCmd processCmd =
assembleLogProcessData(channelStateList);
RpcClient rpcClient = Ioc.ins().getBean(RpcClient.class);
RemotingCommand req =
RemotingCommand.createRequestCommand(Constant.RPCCMD_AGENT_CODE);
+
+ if (progressCompressValue) {
+ // enable collection progress compression
+ req.enableCompression();
+ }
+
req.setBody(GSON.toJson(processCmd).getBytes());
rpcClient.sendToAllMessage(req);
log.debug("send collect progress,data:{}", gson.toJson(processCmd));
diff --git
a/ozhera-log/log-agent/src/main/java/org/apache/ozhera/log/agent/rpc/processor/LogProcessor.java
b/ozhera-log/log-agent/src/main/java/org/apache/ozhera/log/agent/rpc/processor/LogProcessor.java
index 37714d80..709e1259 100644
---
a/ozhera-log/log-agent/src/main/java/org/apache/ozhera/log/agent/rpc/processor/LogProcessor.java
+++
b/ozhera-log/log-agent/src/main/java/org/apache/ozhera/log/agent/rpc/processor/LogProcessor.java
@@ -50,7 +50,7 @@ public class LogProcessor implements NettyRequestProcessor {
public RemotingCommand processRequest(ChannelHandlerContext
channelHandlerContext, RemotingCommand remotingCommand) throws Exception {
LogCollectMeta req = remotingCommand.getReq(LogCollectMeta.class);
- log.info("logCollect config req:{}", GSON.toJson(req));
+ log.info("ca:{}", GSON.toJson(req));
RemotingCommand response =
RemotingCommand.createResponseCommand(LogCmd.LOG_REQ);
response.setBody("ok".getBytes());
diff --git
a/ozhera-log/log-agent/src/main/java/org/apache/ozhera/log/agent/rpc/task/PingTask.java
b/ozhera-log/log-agent/src/main/java/org/apache/ozhera/log/agent/rpc/task/PingTask.java
index 37ad68b1..9af684b4 100644
---
a/ozhera-log/log-agent/src/main/java/org/apache/ozhera/log/agent/rpc/task/PingTask.java
+++
b/ozhera-log/log-agent/src/main/java/org/apache/ozhera/log/agent/rpc/task/PingTask.java
@@ -61,9 +61,9 @@ public class PingTask extends Task {
String message = load.get() ? "ping:" +
System.currentTimeMillis() : "load";
ping.setMessage(message);
- RemotingCommand req =
RemotingCommand.createRequestCommand(RpcCmd.pingReq);
- req.setBody(GSON.toJson(ping).getBytes());
for (String service : client.getServerList().get()) {
+ RemotingCommand req =
RemotingCommand.createRequestCommand(RpcCmd.pingReq);
+ req.setBody(GSON.toJson(ping).getBytes());
client.sendMessage(service, req, PingTask::handleResponse);
}
RpcClient.startLatch.countDown();
diff --git a/ozhera-log/log-common/pom.xml b/ozhera-log/log-common/pom.xml
index 8238f903..6e61f6d6 100644
--- a/ozhera-log/log-common/pom.xml
+++ b/ozhera-log/log-common/pom.xml
@@ -27,7 +27,7 @@ http://www.apache.org/licenses/LICENSE-2.0
</parent>
<artifactId>log-common</artifactId>
- <version>2.2.7-SNAPSHOT</version>
+ <version>2.2.8-SNAPSHOT</version>
<modelVersion>4.0.0</modelVersion>
<properties>
diff --git
a/ozhera-log/log-common/src/main/java/org/apache/ozhera/log/parse/PlaceholderParser.java
b/ozhera-log/log-common/src/main/java/org/apache/ozhera/log/parse/PlaceholderParser.java
index 86f213be..d1c9d670 100644
---
a/ozhera-log/log-common/src/main/java/org/apache/ozhera/log/parse/PlaceholderParser.java
+++
b/ozhera-log/log-common/src/main/java/org/apache/ozhera/log/parse/PlaceholderParser.java
@@ -163,8 +163,12 @@ public class PlaceholderParser extends AbstractLogParser {
}
// 丢弃字段
- if (!"-".equals(part.modifier) && fieldIndex <
fieldNames.size()) {
- result.put(fieldNames.get(fieldIndex++), fieldValue);
+ if (!"-".equals(part.modifier) && (fieldIndex <
fieldNames.size() || fieldNames.isEmpty())) {
+ if (!fieldNames.isEmpty()) {
+ result.put(fieldNames.get(fieldIndex++),
fieldValue);
+ } else {
+ result.put("field" + fieldIndex++, fieldValue);
+ }
}
} else {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]