This is an automated email from the ASF dual-hosted git repository.

gosonzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 8d65089112 [INLONG-8725][DataProxy] Cache file metric output switch 
value at usage location (#8726)
8d65089112 is described below

commit 8d6508911290a665c962f3535d11eab7318f1f55
Author: Goson Zhang <[email protected]>
AuthorDate: Tue Aug 15 14:09:44 2023 +0800

    [INLONG-8725][DataProxy] Cache file metric output switch value at usage 
location (#8726)
---
 .../inlong/dataproxy/sink/common/SinkContext.java  | 19 ++++++------
 .../dataproxy/sink/mq/SimplePackProfile.java       | 34 +++++++++++-----------
 .../apache/inlong/dataproxy/source/BaseSource.java | 13 +++++----
 3 files changed, 34 insertions(+), 32 deletions(-)

diff --git 
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/common/SinkContext.java
 
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/common/SinkContext.java
index 61e901dddd..e426c06cff 100644
--- 
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/common/SinkContext.java
+++ 
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/common/SinkContext.java
@@ -66,6 +66,7 @@ public class SinkContext {
     // file metric statistic
     protected MonitorIndex monitorIndex = null;
     private MonitorStats monitorStats = null;
+    private final boolean enableFileMetric;
 
     /**
      * Constructor
@@ -78,6 +79,7 @@ public class SinkContext {
         this.maxThreads = sinkContext.getInteger(KEY_MAX_THREADS, 10);
         this.processInterval = sinkContext.getInteger(KEY_PROCESSINTERVAL, 
100);
         this.reloadInterval = sinkContext.getLong(KEY_RELOADINTERVAL, 60000L);
+        this.enableFileMetric = 
CommonConfigHolder.getInstance().isEnableFileMetric();
         //
         this.metricItemSet = new DataProxyMetricItemSet(sinkName);
         MetricRegister.register(this.metricItemSet);
@@ -88,7 +90,7 @@ public class SinkContext {
      */
     public void start() {
         // init monitor logic
-        if (CommonConfigHolder.getInstance().isEnableFileMetric()) {
+        if (enableFileMetric) {
             this.monitorIndex = new 
MonitorIndex(CommonConfigHolder.getInstance().getFileMetricSinkOutName(),
                     
CommonConfigHolder.getInstance().getFileMetricStatInvlSec() * 1000L,
                     
CommonConfigHolder.getInstance().getFileMetricStatCacheCnt());
@@ -107,7 +109,7 @@ public class SinkContext {
      */
     public void close() {
         // stop file statistic index
-        if (CommonConfigHolder.getInstance().isEnableFileMetric()) {
+        if (enableFileMetric) {
             if (monitorIndex != null) {
                 monitorIndex.stop();
             }
@@ -118,21 +120,20 @@ public class SinkContext {
     }
 
     public void fileMetricIncSumStats(String eventKey) {
-        if (CommonConfigHolder.getInstance().isEnableFileMetric()) {
+        if (enableFileMetric) {
             monitorStats.incSumStats(eventKey);
         }
     }
 
     public void fileMetricIncWithDetailStats(String eventKey, String 
detailInfoKey) {
-        if (CommonConfigHolder.getInstance().isEnableFileMetric()) {
+        if (enableFileMetric) {
             monitorStats.incSumStats(eventKey);
             monitorStats.incDetailStats(eventKey + "#" + detailInfoKey);
         }
     }
 
     public void fileMetricAddSuccStats(PackProfile profile, String topic, 
String brokerIP) {
-        if (!CommonConfigHolder.getInstance().isEnableFileMetric()
-                || !(profile instanceof SimplePackProfile)) {
+        if (!enableFileMetric || !(profile instanceof SimplePackProfile)) {
             return;
         }
         fileMetricIncStats((SimplePackProfile) profile, true,
@@ -140,8 +141,7 @@ public class SinkContext {
     }
 
     public void fileMetricAddFailStats(PackProfile profile, String topic, 
String brokerIP, String detailKey) {
-        if (!CommonConfigHolder.getInstance().isEnableFileMetric()
-                || !(profile instanceof SimplePackProfile)) {
+        if (!enableFileMetric || !(profile instanceof SimplePackProfile)) {
             return;
         }
         fileMetricIncStats((SimplePackProfile) profile, false,
@@ -149,8 +149,7 @@ public class SinkContext {
     }
 
     public void fileMetricAddExceptStats(PackProfile profile, String topic, 
String brokerIP, String detailKey) {
-        if (!CommonConfigHolder.getInstance().isEnableFileMetric()
-                || !(profile instanceof SimplePackProfile)) {
+        if (!enableFileMetric || !(profile instanceof SimplePackProfile)) {
             return;
         }
         fileMetricIncStats((SimplePackProfile) profile, false,
diff --git 
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/SimplePackProfile.java
 
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/SimplePackProfile.java
index 6d8ecf044a..6b5752a630 100644
--- 
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/SimplePackProfile.java
+++ 
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/SimplePackProfile.java
@@ -166,23 +166,23 @@ public class SimplePackProfile extends PackProfile {
      *  Return response to client in source
      */
     private void responseV0Msg(DataProxyErrCode errCode, String errMsg) {
-        try {
-            String uid = event.getHeaders().get(AttributeConstants.UNIQ_ID);
-            if 
("false".equals(event.getHeaders().get(AttributeConstants.MESSAGE_IS_ACK))) {
-                if (logger.isDebugEnabled()) {
-                    logger.debug("not need to rsp message: seqId = {}, 
inlongGroupId = {}, inlongStreamId = {}",
-                            uid, this.getInlongGroupId(), 
this.getInlongStreamId());
-                }
-                return;
+        String uniqId = event.getHeaders().get(AttributeConstants.UNIQ_ID);
+        if 
("false".equals(event.getHeaders().get(AttributeConstants.MESSAGE_IS_ACK))) {
+            if (logger.isDebugEnabled()) {
+                logger.debug("not need to rsp message: uniqId = {}, 
inlongGroupId = {}, inlongStreamId = {}",
+                        uniqId, this.getInlongGroupId(), 
this.getInlongStreamId());
             }
-            // check channel status
-            if (channel == null || !channel.isWritable()) {
-                if (logCounter.shouldPrint()) {
-                    logger.warn("Prepare send msg but channel full, 
msgType={}, attr={}, channel={}",
-                            msgType, event.getHeaders(), channel);
-                }
-                return;
+            return;
+        }
+        // check channel status
+        if (channel == null || !channel.isWritable()) {
+            if (logCounter.shouldPrint()) {
+                logger.warn("Prepare send msg but channel full, msgType={}, 
attr={}, channel={}",
+                        msgType, event.getHeaders(), channel);
             }
+            return;
+        }
+        try {
             // build return attribute string
             StringBuilder strBuff = new StringBuilder(512);
             if (errCode != DataProxyErrCode.SUCCESS) {
@@ -204,7 +204,7 @@ public class SimplePackProfile extends PackProfile {
             // build and send response message
             ByteBuf retData;
             if (MsgType.MSG_BIN_MULTI_BODY.equals(msgType)) {
-                retData = 
ServerMessageHandler.buildBinMsgRspPackage(strBuff.toString(), 
Long.parseLong(uid));
+                retData = 
ServerMessageHandler.buildBinMsgRspPackage(strBuff.toString(), 
Long.parseLong(uniqId));
             } else {
                 retData = ServerMessageHandler.buildTxtMsgRspPackage(msgType, 
strBuff.toString());
             }
@@ -217,7 +217,7 @@ public class SimplePackProfile extends PackProfile {
                 }
                 return;
             }
-            channel.writeAndFlush(strBuff);
+            channel.writeAndFlush(retData);
         } catch (Throwable e) {
             //
             if (logCounter.shouldPrint()) {
diff --git 
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/BaseSource.java
 
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/BaseSource.java
index 1fa617dc4d..814aae2620 100644
--- 
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/BaseSource.java
+++ 
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/BaseSource.java
@@ -128,10 +128,13 @@ public abstract class BaseSource
     private MonitorStats monitorStats = null;
     // metric set
     private DataProxyMetricItemSet metricItemSet;
+    // whether enable file metric
+    protected boolean enableFileMetric;
 
     public BaseSource() {
         super();
         allChannels = new DefaultChannelGroup("DefaultChannelGroup", 
GlobalEventExecutor.INSTANCE);
+        this.enableFileMetric = 
CommonConfigHolder.getInstance().isEnableFileMetric();
     }
 
     @Override
@@ -256,7 +259,7 @@ public abstract class BaseSource
                 CommonConfigHolder.getInstance().getClusterName(), 
this.cachedSrcName, String.valueOf(srcPort));
         MetricRegister.register(metricItemSet);
         // init monitor logic
-        if (CommonConfigHolder.getInstance().isEnableFileMetric()) {
+        if (enableFileMetric) {
             this.monitorIndex = new 
MonitorIndex(CommonConfigHolder.getInstance().getFileMetricSourceOutName(),
                     
CommonConfigHolder.getInstance().getFileMetricStatInvlSec() * 1000L,
                     
CommonConfigHolder.getInstance().getFileMetricStatCacheCnt());
@@ -304,7 +307,7 @@ public abstract class BaseSource
             this.workerGroup.shutdownGracefully();
         }
         // stop file statistic index
-        if (CommonConfigHolder.getInstance().isEnableFileMetric()) {
+        if (enableFileMetric) {
             if (monitorIndex != null) {
                 monitorIndex.stop();
             }
@@ -409,13 +412,13 @@ public abstract class BaseSource
     }
 
     public void fileMetricIncSumStats(String eventKey) {
-        if (CommonConfigHolder.getInstance().isEnableFileMetric()) {
+        if (enableFileMetric) {
             monitorStats.incSumStats(eventKey);
         }
     }
 
     public void fileMetricIncDetailStats(String eventKey) {
-        if (CommonConfigHolder.getInstance().isEnableFileMetric()) {
+        if (enableFileMetric) {
             monitorStats.incDetailStats(eventKey);
         }
     }
@@ -436,7 +439,7 @@ public abstract class BaseSource
     private void fileMetricIncStats(StringBuilder strBuff, boolean isSucc, 
String groupId,
             String streamId, String topicName, String clientIP, String 
msgProcType,
             long dt, long pkgTime, int cnt, int packCnt, long packSize, int 
failCnt) {
-        if (!CommonConfigHolder.getInstance().isEnableFileMetric()) {
+        if (!enableFileMetric) {
             return;
         }
         String tenMinsDt = DateTimeUtils.ms2yyyyMMddHHmmTenMins(dt);

Reply via email to