This is an automated email from the ASF dual-hosted git repository.

zhangxiaowei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozhera.git


The following commit(s) were added to refs/heads/master by this push:
     new 934e4dea refactor: optimize the `log-stream` Module in Logging (#544)
934e4dea is described below

commit 934e4dea051b0a96ac5ad8223d80b74bea27db56
Author: wtt <[email protected]>
AuthorDate: Sat Feb 8 15:34:39 2025 +0800

    refactor: optimize the `log-stream` Module in Logging (#544)
---
 .../log/stream/config/MilogConfigListener.java     | 66 ++++++++++------------
 .../apache/ozhera/log/stream/job/JobManager.java   | 53 +++++------------
 2 files changed, 44 insertions(+), 75 deletions(-)

diff --git 
a/ozhera-log/log-stream/src/main/java/org/apache/ozhera/log/stream/config/MilogConfigListener.java
 
b/ozhera-log/log-stream/src/main/java/org/apache/ozhera/log/stream/config/MilogConfigListener.java
index e9bf5156..f948bcbe 100644
--- 
a/ozhera-log/log-stream/src/main/java/org/apache/ozhera/log/stream/config/MilogConfigListener.java
+++ 
b/ozhera-log/log-stream/src/main/java/org/apache/ozhera/log/stream/config/MilogConfigListener.java
@@ -23,11 +23,11 @@ import com.google.common.collect.Lists;
 import com.google.gson.Gson;
 import com.xiaomi.youpin.docean.Ioc;
 import com.xiaomi.youpin.docean.anno.Component;
-import com.xiaomi.youpin.docean.common.StringUtils;
 import com.xiaomi.youpin.docean.plugin.nacos.NacosConfig;
 import lombok.Getter;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.collections.CollectionUtils;
+import org.apache.commons.lang3.StringUtils;
 import org.apache.ozhera.log.common.Config;
 import org.apache.ozhera.log.common.Constant;
 import org.apache.ozhera.log.model.LogtailConfig;
@@ -40,8 +40,10 @@ import org.jetbrains.annotations.NotNull;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.concurrent.*;
-import java.util.concurrent.locks.ReentrantLock;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executor;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
 import java.util.stream.Collectors;
 
 import static org.apache.ozhera.log.common.Constant.GSON;
@@ -68,10 +70,10 @@ public class MilogConfigListener {
     private Map<Long, LogtailConfig> oldLogTailConfigMap = new 
ConcurrentHashMap<>();
     private Map<Long, SinkConfig> oldSinkConfigMap = new ConcurrentHashMap<>();
 
-    private ReentrantLock buildDataLock = new ReentrantLock();
-
     private StreamCommonExtension streamCommonExtension;
 
+    private volatile String originConfig;
+
     public MilogConfigListener(Long spaceId, String dataId, String group, 
MilogSpaceData milogSpaceData, NacosConfig nacosConfig) {
         this.spaceId = spaceId;
         this.dataId = dataId;
@@ -89,38 +91,26 @@ public class MilogConfigListener {
         return Ioc.ins().getBean(factualServiceName);
     }
 
-    private void handleNacosConfigDataJob(MilogSpaceData newMilogSpaceData) 
throws Exception {
-        boolean locked = false;
-        try {
-            locked = buildDataLock.tryLock(1, TimeUnit.MINUTES);
-            if (locked) {
-                if (!oldLogTailConfigMap.isEmpty() && 
!oldSinkConfigMap.isEmpty()) {
-                    List<SinkConfig> sinkConfigs = 
newMilogSpaceData.getSpaceConfig();
-                    stopUnusedOldStoreJobs(sinkConfigs);
-                    for (SinkConfig sinkConfig : sinkConfigs) {
-                        stopOldJobsForRemovedTailIds(sinkConfig);
-                        if 
(oldSinkConfigMap.containsKey(sinkConfig.getLogstoreId())) {
-                            //Whether the submission store information 
changes, the change stops
-                            if (!isStoreSame(sinkConfig, 
oldSinkConfigMap.get(sinkConfig.getLogstoreId()))) {
-                                restartPerTail(sinkConfig, milogSpaceData);
-                            } else {
-                                handlePerTailComparison(sinkConfig, 
milogSpaceData);
-                            }
-                        } else {
-                            newStoreStart(sinkConfig, milogSpaceData);
-                        }
+    private void handleNacosConfigDataJob(MilogSpaceData newMilogSpaceData) {
+        if (!oldLogTailConfigMap.isEmpty() && !oldSinkConfigMap.isEmpty()) {
+            List<SinkConfig> sinkConfigs = newMilogSpaceData.getSpaceConfig();
+            stopUnusedOldStoreJobs(sinkConfigs);
+            for (SinkConfig sinkConfig : sinkConfigs) {
+                stopOldJobsForRemovedTailIds(sinkConfig);
+                if (oldSinkConfigMap.containsKey(sinkConfig.getLogstoreId())) {
+                    //Whether the submission store information changes, the 
change stops
+                    if (!isStoreSame(sinkConfig, 
oldSinkConfigMap.get(sinkConfig.getLogstoreId()))) {
+                        restartPerTail(sinkConfig, milogSpaceData);
+                    } else {
+                        handlePerTailComparison(sinkConfig, milogSpaceData);
                     }
                 } else {
-                    // Restart all
-                    initNewJob(newMilogSpaceData);
+                    newStoreStart(sinkConfig, milogSpaceData);
                 }
-            } else {
-                log.warn("handleNacosConfigDataJob lock failed,data:{}", 
gson.toJson(newMilogSpaceData));
-            }
-        } finally {
-            if (locked) {
-                buildDataLock.unlock();
             }
+        } else {
+            // Restart all
+            initNewJob(newMilogSpaceData);
         }
     }
 
@@ -283,7 +273,7 @@ public class MilogConfigListener {
 
     private void startTailPer(SinkConfig sinkConfig, LogtailConfig 
logTailConfig, Long logSpaceId) {
         if (null == logSpaceId || null == logTailConfig || null == 
logTailConfig.getLogtailId()) {
-            log.error("logSpaceId or logTailConfig or logTailId is 
null,sinkConfig:{},logTailConfig:{},logSpaceId:{}", gson.toJson(sinkConfig), 
gson.toJson(logTailConfig), spaceId);
+            log.error("logSpaceId or logTailConfig or logTailId is 
null,storeId:{},tailId:{},logSpaceId:{}", sinkConfig.getLogstoreId(), 
logTailConfig.getLogtailId(), spaceId);
             return;
         }
         Boolean isStart = 
streamCommonExtension.preCheckTaskExecution(sinkConfig, logTailConfig, 
logSpaceId);
@@ -291,7 +281,7 @@ public class MilogConfigListener {
             log.warn("preCheckTaskExecution error,preCheckTaskExecution is 
false,LogTailConfig:{}", gson.toJson(logTailConfig));
             return;
         }
-        log.info("【Listen tail】Initialize the new task, tail 
configuration:{},index:{},cluster information:{},spaceId:{}", 
gson.toJson(logTailConfig), sinkConfig.getEsIndex(), 
gson.toJson(sinkConfig.getEsInfo()), logSpaceId);
+        log.info("Initialize the new task, tail 
configuration:{},index:{},cluster information:{},spaceId:{}", 
gson.toJson(logTailConfig), sinkConfig.getEsIndex(), 
gson.toJson(sinkConfig.getEsInfo()), logSpaceId);
         jobManager.startJob(logTailConfig, sinkConfig, logSpaceId);
         oldLogTailConfigMap.put(logTailConfig.getLogtailId(), logTailConfig);
     }
@@ -309,7 +299,11 @@ public class MilogConfigListener {
             @Override
             public void receiveConfigInfo(String dataValue) {
                 try {
-                    log.info("listen tail received a configuration 
request:{},a configuration that already exists:storeMap:{},tailMap:{}", 
dataValue, gson.toJson(oldSinkConfigMap), gson.toJson(oldLogTailConfigMap));
+                    if (StringUtils.equals(originConfig, dataValue)) {
+                        return;
+                    }
+                    originConfig = dataValue;
+                    log.info("listen tail received a configuration 
request:{},origin config:{}", dataValue, originConfig);
                     if (StringUtils.isNotEmpty(dataValue) && 
!Constant.NULLVALUE.equals(dataValue)) {
                         dataValue = 
streamCommonExtension.dataPreProcess(dataValue);
                         MilogSpaceData newMilogSpaceData = 
GSON.fromJson(dataValue, MilogSpaceData.class);
diff --git 
a/ozhera-log/log-stream/src/main/java/org/apache/ozhera/log/stream/job/JobManager.java
 
b/ozhera-log/log-stream/src/main/java/org/apache/ozhera/log/stream/job/JobManager.java
index ac6fa56d..1a9227b1 100644
--- 
a/ozhera-log/log-stream/src/main/java/org/apache/ozhera/log/stream/job/JobManager.java
+++ 
b/ozhera-log/log-stream/src/main/java/org/apache/ozhera/log/stream/job/JobManager.java
@@ -38,8 +38,6 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.locks.ReentrantLock;
 import java.util.stream.Collectors;
 
 
@@ -59,10 +57,6 @@ public class JobManager {
 
     private Gson gson = new Gson();
 
-    private ReentrantLock stopLock = new ReentrantLock();
-
-    private ReentrantLock startLock = new ReentrantLock();
-
     public JobManager() {
         sinkJobType = Config.ins().get(SINK_JOB_TYPE_KEY, "");
         sinkChain = Ioc.ins().getBean(SinkChain.class);
@@ -101,22 +95,14 @@ public class JobManager {
     }
 
     public void stopJob(LogtailConfig logtailConfig) {
-        boolean locked = false;
         try {
-            locked = stopLock.tryLock(10, TimeUnit.SECONDS);
-            if (locked) {
-                List<Long> jobKeys = 
jobs.entrySet().stream().map(Map.Entry::getKey).collect(Collectors.toList());
+            List<Long> jobKeys = 
jobs.entrySet().stream().map(Map.Entry::getKey).collect(Collectors.toList());
+            if(CollectionUtils.isNotEmpty(jobKeys)){
                 log.info("【stop job】,all jobs:{}", jobKeys);
                 sinkJobsShutDown(logtailConfig);
-            } else {
-                log.warn("【stop job】,other job is running,wait 
10s,tailConfig:{}", gson.toJson(logtailConfig));
             }
         } catch (Exception e) {
             log.error(String.format("[JobManager.stopJob] stopJob 
err,logtailId:%s", logtailConfig.getLogtailId()), e);
-        } finally {
-            if (locked) {
-                stopLock.unlock();
-            }
         }
     }
 
@@ -124,7 +110,7 @@ public class JobManager {
             logtailConfig, SinkConfig sinkConfig, Long logSpaceId) {
         try {
             SinkJobConfig sinkJobConfig = buildSinkJobConfig(type, ak, sk, 
clusterInfo, logtailConfig, sinkConfig, logSpaceId);
-            log.warn("##startConsumerJob## spaceId:{}, storeId:{}, tailId:{}", 
sinkJobConfig.getLogSpaceId(), sinkJobConfig.getLogStoreId(), 
sinkJobConfig.getLogTailId());
+            log.warn("startConsumerJob spaceId:{}, storeId:{}, tailId:{}", 
sinkJobConfig.getLogSpaceId(), sinkJobConfig.getLogStoreId(), 
sinkJobConfig.getLogTailId());
 
             String sinkProviderBean = sinkJobConfig.getMqType() + 
LogStreamConstants.sinkJobProviderBeanSuffix;
             SinkJobProvider sinkJobProvider = 
Ioc.ins().getBean(sinkProviderBean);
@@ -149,10 +135,9 @@ public class JobManager {
                 startSinkJob(sinkJobProvider.getBackupJob(sinkJobConfig), 
SinkJobEnum.BACKUP_JOB,
                         logtailConfig.getLogtailId());
             }
-
-            log.info(String.format("[JobManager.initJobs] startJob 
success,logTailId:%s,topic:%s,tag:%s,esIndex:%s", logtailConfig.getLogtailId(), 
logtailConfig.getTopic(), logtailConfig.getTag(), sinkConfig.getEsIndex()));
+            log.info(String.format("startJob 
success,logTailId:%s,topic:%s,tag:%s,esIndex:%s", logtailConfig.getLogtailId(), 
logtailConfig.getTopic(), logtailConfig.getTag(), sinkConfig.getEsIndex()));
         } catch (Throwable e) {
-            log.error(String.format("[JobManager.initJobs] startJob 
err,logTailId:%s,topic:%s,tag:%s,esIndex:%s", logtailConfig.getLogtailId(), 
logtailConfig.getTopic(), logtailConfig.getTag(), sinkConfig.getEsIndex()), new 
RuntimeException(e));
+            log.error(String.format("startJob 
err,logTailId:%s,topic:%s,tag:%s,esIndex:%s", logtailConfig.getLogtailId(), 
logtailConfig.getTopic(), logtailConfig.getTag(), sinkConfig.getEsIndex()), new 
RuntimeException(e));
         }
     }
 
@@ -195,28 +180,18 @@ public class JobManager {
     }
 
     public void startJob(LogtailConfig logtailConfig, SinkConfig sinkConfig, 
Long logSpaceId) {
-        boolean locked = false;
         try {
-            locked = startLock.tryLock(10, TimeUnit.SECONDS);
-            if (locked) {
-                String ak = logtailConfig.getAk();
-                String sk = logtailConfig.getSk();
-                String clusterInfo = logtailConfig.getClusterInfo();
-                String type = logtailConfig.getType();
-                if (StringUtils.isEmpty(clusterInfo) || 
StringUtils.isEmpty(logtailConfig.getTopic())) {
-                    log.info("start job error,ak or sk or logtailConfig 
null,ak:{},sk:{},logtailConfig:{}", ak, sk, new Gson().toJson(logtailConfig));
-                    return;
-                }
-                startConsumerJob(type, ak, sk, clusterInfo, logtailConfig, 
sinkConfig, logSpaceId);
-            } else {
-                log.warn("start job error,lock 
timeout,tailConfig:{},sinkConfig:{}", gson.toJson(logtailConfig), 
gson.toJson(sinkConfig));
+            String ak = logtailConfig.getAk();
+            String sk = logtailConfig.getSk();
+            String clusterInfo = logtailConfig.getClusterInfo();
+            String type = logtailConfig.getType();
+            if (StringUtils.isEmpty(clusterInfo) || 
StringUtils.isEmpty(logtailConfig.getTopic())) {
+                log.info("start job error,ak or sk or logTailConfig 
null,ak:{},sk:{},logTailConfig:{}", ak, sk, gson.toJson(logtailConfig));
+                return;
             }
+            startConsumerJob(type, ak, sk, clusterInfo, logtailConfig, 
sinkConfig, logSpaceId);
         } catch (Exception e) {
-            log.error(String.format("[JobManager.startJob] start job 
err,logTailConfig:%s,esIndex:%s", logtailConfig, sinkConfig.getEsIndex()), e);
-        } finally {
-            if (locked) {
-                startLock.unlock();
-            }
+            log.error(String.format("start job 
err,logTailConfig:%s,esIndex:%s", logtailConfig, sinkConfig.getEsIndex()), e);
         }
     }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to