This is an automated email from the ASF dual-hosted git repository.
gosonzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 8d65089112 [INLONG-8725][DataProxy] Cache file metric output switch
value at usage location (#8726)
8d65089112 is described below
commit 8d6508911290a665c962f3535d11eab7318f1f55
Author: Goson Zhang <[email protected]>
AuthorDate: Tue Aug 15 14:09:44 2023 +0800
[INLONG-8725][DataProxy] Cache file metric output switch value at usage
location (#8726)
---
.../inlong/dataproxy/sink/common/SinkContext.java | 19 ++++++------
.../dataproxy/sink/mq/SimplePackProfile.java | 34 +++++++++++-----------
.../apache/inlong/dataproxy/source/BaseSource.java | 13 +++++----
3 files changed, 34 insertions(+), 32 deletions(-)
diff --git
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/common/SinkContext.java
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/common/SinkContext.java
index 61e901dddd..e426c06cff 100644
---
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/common/SinkContext.java
+++
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/common/SinkContext.java
@@ -66,6 +66,7 @@ public class SinkContext {
// file metric statistic
protected MonitorIndex monitorIndex = null;
private MonitorStats monitorStats = null;
+ private final boolean enableFileMetric;
/**
* Constructor
@@ -78,6 +79,7 @@ public class SinkContext {
this.maxThreads = sinkContext.getInteger(KEY_MAX_THREADS, 10);
this.processInterval = sinkContext.getInteger(KEY_PROCESSINTERVAL,
100);
this.reloadInterval = sinkContext.getLong(KEY_RELOADINTERVAL, 60000L);
+ this.enableFileMetric =
CommonConfigHolder.getInstance().isEnableFileMetric();
//
this.metricItemSet = new DataProxyMetricItemSet(sinkName);
MetricRegister.register(this.metricItemSet);
@@ -88,7 +90,7 @@ public class SinkContext {
*/
public void start() {
// init monitor logic
- if (CommonConfigHolder.getInstance().isEnableFileMetric()) {
+ if (enableFileMetric) {
this.monitorIndex = new
MonitorIndex(CommonConfigHolder.getInstance().getFileMetricSinkOutName(),
CommonConfigHolder.getInstance().getFileMetricStatInvlSec() * 1000L,
CommonConfigHolder.getInstance().getFileMetricStatCacheCnt());
@@ -107,7 +109,7 @@ public class SinkContext {
*/
public void close() {
// stop file statistic index
- if (CommonConfigHolder.getInstance().isEnableFileMetric()) {
+ if (enableFileMetric) {
if (monitorIndex != null) {
monitorIndex.stop();
}
@@ -118,21 +120,20 @@ public class SinkContext {
}
public void fileMetricIncSumStats(String eventKey) {
- if (CommonConfigHolder.getInstance().isEnableFileMetric()) {
+ if (enableFileMetric) {
monitorStats.incSumStats(eventKey);
}
}
public void fileMetricIncWithDetailStats(String eventKey, String
detailInfoKey) {
- if (CommonConfigHolder.getInstance().isEnableFileMetric()) {
+ if (enableFileMetric) {
monitorStats.incSumStats(eventKey);
monitorStats.incDetailStats(eventKey + "#" + detailInfoKey);
}
}
public void fileMetricAddSuccStats(PackProfile profile, String topic,
String brokerIP) {
- if (!CommonConfigHolder.getInstance().isEnableFileMetric()
- || !(profile instanceof SimplePackProfile)) {
+ if (!enableFileMetric || !(profile instanceof SimplePackProfile)) {
return;
}
fileMetricIncStats((SimplePackProfile) profile, true,
@@ -140,8 +141,7 @@ public class SinkContext {
}
public void fileMetricAddFailStats(PackProfile profile, String topic,
String brokerIP, String detailKey) {
- if (!CommonConfigHolder.getInstance().isEnableFileMetric()
- || !(profile instanceof SimplePackProfile)) {
+ if (!enableFileMetric || !(profile instanceof SimplePackProfile)) {
return;
}
fileMetricIncStats((SimplePackProfile) profile, false,
@@ -149,8 +149,7 @@ public class SinkContext {
}
public void fileMetricAddExceptStats(PackProfile profile, String topic,
String brokerIP, String detailKey) {
- if (!CommonConfigHolder.getInstance().isEnableFileMetric()
- || !(profile instanceof SimplePackProfile)) {
+ if (!enableFileMetric || !(profile instanceof SimplePackProfile)) {
return;
}
fileMetricIncStats((SimplePackProfile) profile, false,
diff --git
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/SimplePackProfile.java
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/SimplePackProfile.java
index 6d8ecf044a..6b5752a630 100644
---
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/SimplePackProfile.java
+++
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/SimplePackProfile.java
@@ -166,23 +166,23 @@ public class SimplePackProfile extends PackProfile {
* Return response to client in source
*/
private void responseV0Msg(DataProxyErrCode errCode, String errMsg) {
- try {
- String uid = event.getHeaders().get(AttributeConstants.UNIQ_ID);
- if
("false".equals(event.getHeaders().get(AttributeConstants.MESSAGE_IS_ACK))) {
- if (logger.isDebugEnabled()) {
- logger.debug("not need to rsp message: seqId = {},
inlongGroupId = {}, inlongStreamId = {}",
- uid, this.getInlongGroupId(),
this.getInlongStreamId());
- }
- return;
+ String uniqId = event.getHeaders().get(AttributeConstants.UNIQ_ID);
+ if
("false".equals(event.getHeaders().get(AttributeConstants.MESSAGE_IS_ACK))) {
+ if (logger.isDebugEnabled()) {
+ logger.debug("not need to rsp message: uniqId = {},
inlongGroupId = {}, inlongStreamId = {}",
+ uniqId, this.getInlongGroupId(),
this.getInlongStreamId());
}
- // check channel status
- if (channel == null || !channel.isWritable()) {
- if (logCounter.shouldPrint()) {
- logger.warn("Prepare send msg but channel full,
msgType={}, attr={}, channel={}",
- msgType, event.getHeaders(), channel);
- }
- return;
+ return;
+ }
+ // check channel status
+ if (channel == null || !channel.isWritable()) {
+ if (logCounter.shouldPrint()) {
+ logger.warn("Prepare send msg but channel full, msgType={},
attr={}, channel={}",
+ msgType, event.getHeaders(), channel);
}
+ return;
+ }
+ try {
// build return attribute string
StringBuilder strBuff = new StringBuilder(512);
if (errCode != DataProxyErrCode.SUCCESS) {
@@ -204,7 +204,7 @@ public class SimplePackProfile extends PackProfile {
// build and send response message
ByteBuf retData;
if (MsgType.MSG_BIN_MULTI_BODY.equals(msgType)) {
- retData =
ServerMessageHandler.buildBinMsgRspPackage(strBuff.toString(),
Long.parseLong(uid));
+ retData =
ServerMessageHandler.buildBinMsgRspPackage(strBuff.toString(),
Long.parseLong(uniqId));
} else {
retData = ServerMessageHandler.buildTxtMsgRspPackage(msgType,
strBuff.toString());
}
@@ -217,7 +217,7 @@ public class SimplePackProfile extends PackProfile {
}
return;
}
- channel.writeAndFlush(strBuff);
+ channel.writeAndFlush(retData);
} catch (Throwable e) {
//
if (logCounter.shouldPrint()) {
diff --git
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/BaseSource.java
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/BaseSource.java
index 1fa617dc4d..814aae2620 100644
---
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/BaseSource.java
+++
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/BaseSource.java
@@ -128,10 +128,13 @@ public abstract class BaseSource
private MonitorStats monitorStats = null;
// metric set
private DataProxyMetricItemSet metricItemSet;
+ // whether enable file metric
+ protected boolean enableFileMetric;
public BaseSource() {
super();
allChannels = new DefaultChannelGroup("DefaultChannelGroup",
GlobalEventExecutor.INSTANCE);
+ this.enableFileMetric =
CommonConfigHolder.getInstance().isEnableFileMetric();
}
@Override
@@ -256,7 +259,7 @@ public abstract class BaseSource
CommonConfigHolder.getInstance().getClusterName(),
this.cachedSrcName, String.valueOf(srcPort));
MetricRegister.register(metricItemSet);
// init monitor logic
- if (CommonConfigHolder.getInstance().isEnableFileMetric()) {
+ if (enableFileMetric) {
this.monitorIndex = new
MonitorIndex(CommonConfigHolder.getInstance().getFileMetricSourceOutName(),
CommonConfigHolder.getInstance().getFileMetricStatInvlSec() * 1000L,
CommonConfigHolder.getInstance().getFileMetricStatCacheCnt());
@@ -304,7 +307,7 @@ public abstract class BaseSource
this.workerGroup.shutdownGracefully();
}
// stop file statistic index
- if (CommonConfigHolder.getInstance().isEnableFileMetric()) {
+ if (enableFileMetric) {
if (monitorIndex != null) {
monitorIndex.stop();
}
@@ -409,13 +412,13 @@ public abstract class BaseSource
}
public void fileMetricIncSumStats(String eventKey) {
- if (CommonConfigHolder.getInstance().isEnableFileMetric()) {
+ if (enableFileMetric) {
monitorStats.incSumStats(eventKey);
}
}
public void fileMetricIncDetailStats(String eventKey) {
- if (CommonConfigHolder.getInstance().isEnableFileMetric()) {
+ if (enableFileMetric) {
monitorStats.incDetailStats(eventKey);
}
}
@@ -436,7 +439,7 @@ public abstract class BaseSource
private void fileMetricIncStats(StringBuilder strBuff, boolean isSucc,
String groupId,
String streamId, String topicName, String clientIP, String
msgProcType,
long dt, long pkgTime, int cnt, int packCnt, long packSize, int
failCnt) {
- if (!CommonConfigHolder.getInstance().isEnableFileMetric()) {
+ if (!enableFileMetric) {
return;
}
String tenMinsDt = DateTimeUtils.ms2yyyyMMddHHmmTenMins(dt);