This is an automated email from the ASF dual-hosted git repository.
zhangxiaowei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozhera.git
The following commit(s) were added to refs/heads/master by this push:
new 934e4dea refactor: optimize the `log-stream` Module in Logging (#544)
934e4dea is described below
commit 934e4dea051b0a96ac5ad8223d80b74bea27db56
Author: wtt <[email protected]>
AuthorDate: Sat Feb 8 15:34:39 2025 +0800
refactor: optimize the `log-stream` Module in Logging (#544)
---
.../log/stream/config/MilogConfigListener.java | 66 ++++++++++------------
.../apache/ozhera/log/stream/job/JobManager.java | 53 +++++------------
2 files changed, 44 insertions(+), 75 deletions(-)
diff --git
a/ozhera-log/log-stream/src/main/java/org/apache/ozhera/log/stream/config/MilogConfigListener.java
b/ozhera-log/log-stream/src/main/java/org/apache/ozhera/log/stream/config/MilogConfigListener.java
index e9bf5156..f948bcbe 100644
---
a/ozhera-log/log-stream/src/main/java/org/apache/ozhera/log/stream/config/MilogConfigListener.java
+++
b/ozhera-log/log-stream/src/main/java/org/apache/ozhera/log/stream/config/MilogConfigListener.java
@@ -23,11 +23,11 @@ import com.google.common.collect.Lists;
import com.google.gson.Gson;
import com.xiaomi.youpin.docean.Ioc;
import com.xiaomi.youpin.docean.anno.Component;
-import com.xiaomi.youpin.docean.common.StringUtils;
import com.xiaomi.youpin.docean.plugin.nacos.NacosConfig;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections.CollectionUtils;
+import org.apache.commons.lang3.StringUtils;
import org.apache.ozhera.log.common.Config;
import org.apache.ozhera.log.common.Constant;
import org.apache.ozhera.log.model.LogtailConfig;
@@ -40,8 +40,10 @@ import org.jetbrains.annotations.NotNull;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import java.util.concurrent.*;
-import java.util.concurrent.locks.ReentrantLock;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executor;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
import java.util.stream.Collectors;
import static org.apache.ozhera.log.common.Constant.GSON;
@@ -68,10 +70,10 @@ public class MilogConfigListener {
private Map<Long, LogtailConfig> oldLogTailConfigMap = new
ConcurrentHashMap<>();
private Map<Long, SinkConfig> oldSinkConfigMap = new ConcurrentHashMap<>();
- private ReentrantLock buildDataLock = new ReentrantLock();
-
private StreamCommonExtension streamCommonExtension;
+ private volatile String originConfig;
+
public MilogConfigListener(Long spaceId, String dataId, String group,
MilogSpaceData milogSpaceData, NacosConfig nacosConfig) {
this.spaceId = spaceId;
this.dataId = dataId;
@@ -89,38 +91,26 @@ public class MilogConfigListener {
return Ioc.ins().getBean(factualServiceName);
}
- private void handleNacosConfigDataJob(MilogSpaceData newMilogSpaceData)
throws Exception {
- boolean locked = false;
- try {
- locked = buildDataLock.tryLock(1, TimeUnit.MINUTES);
- if (locked) {
- if (!oldLogTailConfigMap.isEmpty() &&
!oldSinkConfigMap.isEmpty()) {
- List<SinkConfig> sinkConfigs =
newMilogSpaceData.getSpaceConfig();
- stopUnusedOldStoreJobs(sinkConfigs);
- for (SinkConfig sinkConfig : sinkConfigs) {
- stopOldJobsForRemovedTailIds(sinkConfig);
- if
(oldSinkConfigMap.containsKey(sinkConfig.getLogstoreId())) {
- //Whether the submission store information
changes, the change stops
- if (!isStoreSame(sinkConfig,
oldSinkConfigMap.get(sinkConfig.getLogstoreId()))) {
- restartPerTail(sinkConfig, milogSpaceData);
- } else {
- handlePerTailComparison(sinkConfig,
milogSpaceData);
- }
- } else {
- newStoreStart(sinkConfig, milogSpaceData);
- }
+ private void handleNacosConfigDataJob(MilogSpaceData newMilogSpaceData) {
+ if (!oldLogTailConfigMap.isEmpty() && !oldSinkConfigMap.isEmpty()) {
+ List<SinkConfig> sinkConfigs = newMilogSpaceData.getSpaceConfig();
+ stopUnusedOldStoreJobs(sinkConfigs);
+ for (SinkConfig sinkConfig : sinkConfigs) {
+ stopOldJobsForRemovedTailIds(sinkConfig);
+ if (oldSinkConfigMap.containsKey(sinkConfig.getLogstoreId())) {
+ //Whether the submission store information changes, the
change stops
+ if (!isStoreSame(sinkConfig,
oldSinkConfigMap.get(sinkConfig.getLogstoreId()))) {
+ restartPerTail(sinkConfig, milogSpaceData);
+ } else {
+ handlePerTailComparison(sinkConfig, milogSpaceData);
}
} else {
- // Restart all
- initNewJob(newMilogSpaceData);
+ newStoreStart(sinkConfig, milogSpaceData);
}
- } else {
- log.warn("handleNacosConfigDataJob lock failed,data:{}",
gson.toJson(newMilogSpaceData));
- }
- } finally {
- if (locked) {
- buildDataLock.unlock();
}
+ } else {
+ // Restart all
+ initNewJob(newMilogSpaceData);
}
}
@@ -283,7 +273,7 @@ public class MilogConfigListener {
private void startTailPer(SinkConfig sinkConfig, LogtailConfig
logTailConfig, Long logSpaceId) {
if (null == logSpaceId || null == logTailConfig || null ==
logTailConfig.getLogtailId()) {
- log.error("logSpaceId or logTailConfig or logTailId is
null,sinkConfig:{},logTailConfig:{},logSpaceId:{}", gson.toJson(sinkConfig),
gson.toJson(logTailConfig), spaceId);
+ log.error("logSpaceId or logTailConfig or logTailId is
null,storeId:{},tailId:{},logSpaceId:{}", sinkConfig.getLogstoreId(),
logTailConfig.getLogtailId(), spaceId);
return;
}
Boolean isStart =
streamCommonExtension.preCheckTaskExecution(sinkConfig, logTailConfig,
logSpaceId);
@@ -291,7 +281,7 @@ public class MilogConfigListener {
log.warn("preCheckTaskExecution error,preCheckTaskExecution is
false,LogTailConfig:{}", gson.toJson(logTailConfig));
return;
}
- log.info("【Listen tail】Initialize the new task, tail
configuration:{},index:{},cluster information:{},spaceId:{}",
gson.toJson(logTailConfig), sinkConfig.getEsIndex(),
gson.toJson(sinkConfig.getEsInfo()), logSpaceId);
+ log.info("Initialize the new task, tail
configuration:{},index:{},cluster information:{},spaceId:{}",
gson.toJson(logTailConfig), sinkConfig.getEsIndex(),
gson.toJson(sinkConfig.getEsInfo()), logSpaceId);
jobManager.startJob(logTailConfig, sinkConfig, logSpaceId);
oldLogTailConfigMap.put(logTailConfig.getLogtailId(), logTailConfig);
}
@@ -309,7 +299,11 @@ public class MilogConfigListener {
@Override
public void receiveConfigInfo(String dataValue) {
try {
- log.info("listen tail received a configuration
request:{},a configuration that already exists:storeMap:{},tailMap:{}",
dataValue, gson.toJson(oldSinkConfigMap), gson.toJson(oldLogTailConfigMap));
+ if (StringUtils.equals(originConfig, dataValue)) {
+ return;
+ }
+ originConfig = dataValue;
+ log.info("listen tail received a configuration
request:{},origin config:{}", dataValue, originConfig);
if (StringUtils.isNotEmpty(dataValue) &&
!Constant.NULLVALUE.equals(dataValue)) {
dataValue =
streamCommonExtension.dataPreProcess(dataValue);
MilogSpaceData newMilogSpaceData =
GSON.fromJson(dataValue, MilogSpaceData.class);
diff --git
a/ozhera-log/log-stream/src/main/java/org/apache/ozhera/log/stream/job/JobManager.java
b/ozhera-log/log-stream/src/main/java/org/apache/ozhera/log/stream/job/JobManager.java
index ac6fa56d..1a9227b1 100644
---
a/ozhera-log/log-stream/src/main/java/org/apache/ozhera/log/stream/job/JobManager.java
+++
b/ozhera-log/log-stream/src/main/java/org/apache/ozhera/log/stream/job/JobManager.java
@@ -38,8 +38,6 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.locks.ReentrantLock;
import java.util.stream.Collectors;
@@ -59,10 +57,6 @@ public class JobManager {
private Gson gson = new Gson();
- private ReentrantLock stopLock = new ReentrantLock();
-
- private ReentrantLock startLock = new ReentrantLock();
-
public JobManager() {
sinkJobType = Config.ins().get(SINK_JOB_TYPE_KEY, "");
sinkChain = Ioc.ins().getBean(SinkChain.class);
@@ -101,22 +95,14 @@ public class JobManager {
}
public void stopJob(LogtailConfig logtailConfig) {
- boolean locked = false;
try {
- locked = stopLock.tryLock(10, TimeUnit.SECONDS);
- if (locked) {
- List<Long> jobKeys =
jobs.entrySet().stream().map(Map.Entry::getKey).collect(Collectors.toList());
+ List<Long> jobKeys =
jobs.entrySet().stream().map(Map.Entry::getKey).collect(Collectors.toList());
+ if(CollectionUtils.isNotEmpty(jobKeys)){
log.info("【stop job】,all jobs:{}", jobKeys);
sinkJobsShutDown(logtailConfig);
- } else {
- log.warn("【stop job】,other job is running,wait
10s,tailConfig:{}", gson.toJson(logtailConfig));
}
} catch (Exception e) {
log.error(String.format("[JobManager.stopJob] stopJob
err,logtailId:%s", logtailConfig.getLogtailId()), e);
- } finally {
- if (locked) {
- stopLock.unlock();
- }
}
}
@@ -124,7 +110,7 @@ public class JobManager {
logtailConfig, SinkConfig sinkConfig, Long logSpaceId) {
try {
SinkJobConfig sinkJobConfig = buildSinkJobConfig(type, ak, sk,
clusterInfo, logtailConfig, sinkConfig, logSpaceId);
- log.warn("##startConsumerJob## spaceId:{}, storeId:{}, tailId:{}",
sinkJobConfig.getLogSpaceId(), sinkJobConfig.getLogStoreId(),
sinkJobConfig.getLogTailId());
+ log.warn("startConsumerJob spaceId:{}, storeId:{}, tailId:{}",
sinkJobConfig.getLogSpaceId(), sinkJobConfig.getLogStoreId(),
sinkJobConfig.getLogTailId());
String sinkProviderBean = sinkJobConfig.getMqType() +
LogStreamConstants.sinkJobProviderBeanSuffix;
SinkJobProvider sinkJobProvider =
Ioc.ins().getBean(sinkProviderBean);
@@ -149,10 +135,9 @@ public class JobManager {
startSinkJob(sinkJobProvider.getBackupJob(sinkJobConfig),
SinkJobEnum.BACKUP_JOB,
logtailConfig.getLogtailId());
}
-
- log.info(String.format("[JobManager.initJobs] startJob
success,logTailId:%s,topic:%s,tag:%s,esIndex:%s", logtailConfig.getLogtailId(),
logtailConfig.getTopic(), logtailConfig.getTag(), sinkConfig.getEsIndex()));
+ log.info(String.format("startJob
success,logTailId:%s,topic:%s,tag:%s,esIndex:%s", logtailConfig.getLogtailId(),
logtailConfig.getTopic(), logtailConfig.getTag(), sinkConfig.getEsIndex()));
} catch (Throwable e) {
- log.error(String.format("[JobManager.initJobs] startJob
err,logTailId:%s,topic:%s,tag:%s,esIndex:%s", logtailConfig.getLogtailId(),
logtailConfig.getTopic(), logtailConfig.getTag(), sinkConfig.getEsIndex()), new
RuntimeException(e));
+ log.error(String.format("startJob
err,logTailId:%s,topic:%s,tag:%s,esIndex:%s", logtailConfig.getLogtailId(),
logtailConfig.getTopic(), logtailConfig.getTag(), sinkConfig.getEsIndex()), new
RuntimeException(e));
}
}
@@ -195,28 +180,18 @@ public class JobManager {
}
public void startJob(LogtailConfig logtailConfig, SinkConfig sinkConfig,
Long logSpaceId) {
- boolean locked = false;
try {
- locked = startLock.tryLock(10, TimeUnit.SECONDS);
- if (locked) {
- String ak = logtailConfig.getAk();
- String sk = logtailConfig.getSk();
- String clusterInfo = logtailConfig.getClusterInfo();
- String type = logtailConfig.getType();
- if (StringUtils.isEmpty(clusterInfo) ||
StringUtils.isEmpty(logtailConfig.getTopic())) {
- log.info("start job error,ak or sk or logtailConfig
null,ak:{},sk:{},logtailConfig:{}", ak, sk, new Gson().toJson(logtailConfig));
- return;
- }
- startConsumerJob(type, ak, sk, clusterInfo, logtailConfig,
sinkConfig, logSpaceId);
- } else {
- log.warn("start job error,lock
timeout,tailConfig:{},sinkConfig:{}", gson.toJson(logtailConfig),
gson.toJson(sinkConfig));
+ String ak = logtailConfig.getAk();
+ String sk = logtailConfig.getSk();
+ String clusterInfo = logtailConfig.getClusterInfo();
+ String type = logtailConfig.getType();
+ if (StringUtils.isEmpty(clusterInfo) ||
StringUtils.isEmpty(logtailConfig.getTopic())) {
+ log.info("start job error,ak or sk or logTailConfig
null,ak:{},sk:{},logTailConfig:{}", ak, sk, gson.toJson(logtailConfig));
+ return;
}
+ startConsumerJob(type, ak, sk, clusterInfo, logtailConfig,
sinkConfig, logSpaceId);
} catch (Exception e) {
- log.error(String.format("[JobManager.startJob] start job
err,logTailConfig:%s,esIndex:%s", logtailConfig, sinkConfig.getEsIndex()), e);
- } finally {
- if (locked) {
- startLock.unlock();
- }
+ log.error(String.format("start job
err,logTailConfig:%s,esIndex:%s", logtailConfig, sinkConfig.getEsIndex()), e);
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]