This is an automated email from the ASF dual-hosted git repository.
gosonzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 63c28583ff [INLONG-11743][SDK] Adjustment of metric statistics (#11744)
63c28583ff is described below
commit 63c28583ffe568ae25d21e2e3a860ab44d5926f9
Author: Goson Zhang <[email protected]>
AuthorDate: Tue Feb 11 14:09:13 2025 +0800
[INLONG-11743][SDK] Adjustment of metric statistics (#11744)
Co-authored-by: gosonzhang <[email protected]>
---
.../inlong/sdk/dataproxy/metric/MetaSyncInfo.java | 4 +-
.../sdk/dataproxy/metric/MetricDataHolder.java | 216 +++++++++++++++++----
.../inlong/sdk/dataproxy/metric/TimeCostInfo.java | 4 +-
.../inlong/sdk/dataproxy/metric/TrafficInfo.java | 151 ++++++++++----
.../sdk/dataproxy/network/http/HttpClientMgr.java | 55 +++++-
.../sdk/dataproxy/network/tcp/TcpClientMgr.java | 2 +-
.../sdk/dataproxy/network/tcp/TcpNettyClient.java | 2 +-
.../inlong/sdk/dataproxy/sender/BaseSender.java | 4 +-
.../dataproxy/sender/http/InLongHttpMsgSender.java | 11 +-
.../dataproxy/sender/tcp/InLongTcpMsgSender.java | 26 ++-
10 files changed, 367 insertions(+), 108 deletions(-)
diff --git
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/metric/MetaSyncInfo.java
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/metric/MetaSyncInfo.java
index 2539c7ae53..b574eff2ac 100644
---
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/metric/MetaSyncInfo.java
+++
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/metric/MetaSyncInfo.java
@@ -46,12 +46,12 @@ public class MetaSyncInfo {
public void getAndResetValue(StringBuilder strBuff) {
if (syncErrInfo.isEmpty()) {
- strBuff.append("\"mSync\":{\"errT\":{},");
+ strBuff.append("\"ms\":{\"errT\":{},");
syncCostMs.getAndResetValue(strBuff);
strBuff.append("}");
} else {
long curCnt = 0;
- strBuff.append("\"mSync\":{\"errT\":{");
+ strBuff.append("\"ms\":{\"errT\":{");
for (Map.Entry<Integer, LongAdder> entry : syncErrInfo.entrySet())
{
if (curCnt++ > 0) {
strBuff.append(",");
diff --git
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/metric/MetricDataHolder.java
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/metric/MetricDataHolder.java
index 9b8e19560c..4b16f62c8d 100644
---
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/metric/MetricDataHolder.java
+++
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/metric/MetricDataHolder.java
@@ -40,7 +40,7 @@ public class MetricDataHolder implements Runnable {
private static final String DEFAULT_KEY_SPLITTER = "#";
private static final Logger logger =
LoggerFactory.getLogger(MetricDataHolder.class);
- private static final LogCounter exceptCnt = new LogCounter(10, 100000, 60
* 1000L);
+ private static final LogCounter exceptCnt = new LogCounter(5, 100000, 60 *
1000L);
private final MetricConfig metricConfig;
private final BaseSender sender;
@@ -95,7 +95,7 @@ public class MetricDataHolder implements Runnable {
}
public void addMetaSyncMetric(int errCode, long syncCostMs) {
- if (!this.started || !this.metricConfig.isEnableMetric()) {
+ if (!this.metricConfig.isEnableMetric()) {
return;
}
MetricInfoUnit selectedUnit = metricUnits[itemIndex];
@@ -107,28 +107,56 @@ public class MetricDataHolder implements Runnable {
}
}
- public void addSucMetric(String groupId, String streamId, int msgCnt, long
costMs) {
- if (!this.started || !this.metricConfig.isEnableMetric()) {
+ public void addSyncSucMetric(String groupId, String streamId, int msgCnt,
long costMs) {
+ if (!this.metricConfig.isEnableMetric()) {
return;
}
MetricInfoUnit selectedUnit = metricUnits[itemIndex];
selectedUnit.refCnt.incrementAndGet();
try {
- selectedUnit.addSucMsgInfo(groupId,
+ selectedUnit.addSyncSendSucInfo(groupId,
(this.metricConfig.isMaskStreamId() ? "" : streamId),
msgCnt, costMs);
} finally {
selectedUnit.refCnt.decrementAndGet();
}
}
- public void addFailMetric(int errCode, String groupId, String streamId,
int msgCnt) {
- if (!this.started || !this.metricConfig.isEnableMetric()) {
+ public void addSyncFailMetric(int errCode, String groupId, String
streamId, int msgCnt) {
+ if (!this.metricConfig.isEnableMetric()) {
return;
}
MetricInfoUnit selectedUnit = metricUnits[itemIndex];
selectedUnit.refCnt.incrementAndGet();
try {
- selectedUnit.addFailMsgInfo(groupId,
+ selectedUnit.addSyncSendFailInfo(groupId,
+ (this.metricConfig.isMaskStreamId() ? "" : streamId),
msgCnt, errCode);
+ } finally {
+ selectedUnit.refCnt.decrementAndGet();
+ }
+ }
+
+ public void addAsyncSucReqMetric(String groupId, String streamId, int
msgCnt) {
+ if (!this.metricConfig.isEnableMetric()) {
+ return;
+ }
+ MetricInfoUnit selectedUnit = metricUnits[itemIndex];
+ selectedUnit.refCnt.incrementAndGet();
+ try {
+ selectedUnit.addAsyncSendSucInfo(groupId,
+ (this.metricConfig.isMaskStreamId() ? "" : streamId),
msgCnt);
+ } finally {
+ selectedUnit.refCnt.decrementAndGet();
+ }
+ }
+
+ public void addAsyncFailReqMetric(int errCode, String groupId, String
streamId, int msgCnt) {
+ if (!this.metricConfig.isEnableMetric()) {
+ return;
+ }
+ MetricInfoUnit selectedUnit = metricUnits[itemIndex];
+ selectedUnit.refCnt.incrementAndGet();
+ try {
+ selectedUnit.addAsyncSendFailInfo(groupId,
(this.metricConfig.isMaskStreamId() ? "" : streamId),
msgCnt, errCode);
} finally {
selectedUnit.refCnt.decrementAndGet();
@@ -136,13 +164,13 @@ public class MetricDataHolder implements Runnable {
}
public void addCallbackSucMetric(String groupId, String streamId, int
msgCnt, long costMs, long callDurMs) {
- if (!this.started || !this.metricConfig.isEnableMetric()) {
+ if (!this.metricConfig.isEnableMetric()) {
return;
}
MetricInfoUnit selectedUnit = metricUnits[itemIndex];
selectedUnit.refCnt.incrementAndGet();
try {
- selectedUnit.addSucMsgInfo(groupId,
+ selectedUnit.addAsyncRspSucInfo(groupId,
(this.metricConfig.isMaskStreamId() ? "" : streamId),
msgCnt, costMs, callDurMs);
} finally {
selectedUnit.refCnt.decrementAndGet();
@@ -150,19 +178,61 @@ public class MetricDataHolder implements Runnable {
}
public void addCallbackFailMetric(int errCode, String groupId, String
streamId, int msgCnt, long costMs) {
- if (!this.started || !this.metricConfig.isEnableMetric()) {
+ if (!this.metricConfig.isEnableMetric()) {
return;
}
MetricInfoUnit selectedUnit = metricUnits[itemIndex];
selectedUnit.refCnt.incrementAndGet();
try {
- selectedUnit.addFailMsgInfo(groupId,
+ selectedUnit.addAsyncRspFailInfo(groupId,
(this.metricConfig.isMaskStreamId() ? "" : streamId),
msgCnt, errCode, costMs);
} finally {
selectedUnit.refCnt.decrementAndGet();
}
}
+ public void addAsyncHttpSucPutMetric(String groupId, String streamId, int
msgCnt) {
+ if (!this.metricConfig.isEnableMetric()) {
+ return;
+ }
+ MetricInfoUnit selectedUnit = metricUnits[itemIndex];
+ selectedUnit.refCnt.incrementAndGet();
+ try {
+ selectedUnit.addAsyncHttpPutSucInfo(groupId,
+ (this.metricConfig.isMaskStreamId() ? "" : streamId),
msgCnt);
+ } finally {
+ selectedUnit.refCnt.decrementAndGet();
+ }
+ }
+
+ public void addAsyncHttpFailPutMetric(int errCode, String groupId, String
streamId, int msgCnt) {
+ if (!this.metricConfig.isEnableMetric()) {
+ return;
+ }
+ MetricInfoUnit selectedUnit = metricUnits[itemIndex];
+ selectedUnit.refCnt.incrementAndGet();
+ try {
+ selectedUnit.addAsyncHttpPutFailInfo(groupId,
+ (this.metricConfig.isMaskStreamId() ? "" : streamId),
msgCnt, errCode);
+ } finally {
+ selectedUnit.refCnt.decrementAndGet();
+ }
+ }
+
+ public void addAsyncHttpSucGetMetric(String groupId, String streamId, int
msgCnt) {
+ if (!this.metricConfig.isEnableMetric()) {
+ return;
+ }
+ MetricInfoUnit selectedUnit = metricUnits[itemIndex];
+ selectedUnit.refCnt.incrementAndGet();
+ try {
+ selectedUnit.addAsyncHttpGetSucInfo(groupId,
+ (this.metricConfig.isMaskStreamId() ? "" : streamId),
msgCnt);
+ } finally {
+ selectedUnit.refCnt.decrementAndGet();
+ }
+ }
+
private void outputMetricData(boolean forceOutput, long reportTime, int
readIndex) {
if (!this.metricConfig.isEnableMetric()) {
return;
@@ -180,11 +250,7 @@ public class MetricDataHolder implements Runnable {
|| (System.currentTimeMillis() - startTime >= 5000L)) {
break;
}
- try {
- Thread.sleep(3);
- } catch (InterruptedException e) {
- break;
- }
+ ProxyUtils.sleepSomeTime(80);
} while (selectedUnit.refCnt.get() > 0);
if (!forceOutput && !this.started) {
logger.info("Metric DataHolder({}) closed, stop output metric
info",
@@ -218,7 +284,47 @@ public class MetricDataHolder implements Runnable {
protected final ConcurrentHashMap<String, TrafficInfo> trafficMap =
new ConcurrentHashMap<>();
protected final ConcurrentHashMap<Integer, LongAdder> errCodeMap = new
ConcurrentHashMap<>();
- public void addSucMsgInfo(String groupId, String streamId, int msgCnt,
long costMs) {
+ public void addSyncSendSucInfo(String groupId, String streamId, int
msgCnt, long costMs) {
+ String recordKey = getKeyStringByConfig(groupId, streamId);
+ TrafficInfo trafficInfo = this.trafficMap.get(recordKey);
+ if (trafficInfo == null) {
+ TrafficInfo tmpInfo = new TrafficInfo(groupId, streamId);
+ trafficInfo = this.trafficMap.putIfAbsent(recordKey, tmpInfo);
+ if (trafficInfo == null) {
+ trafficInfo = tmpInfo;
+ }
+ }
+ trafficInfo.addSyncSucMsgInfo(msgCnt, costMs);
+ }
+
+ public void addSyncSendFailInfo(String groupId, String streamId, int
msgCnt, int errCode) {
+ String recordKey = getKeyStringByConfig(groupId, streamId);
+ TrafficInfo trafficInfo = this.trafficMap.get(recordKey);
+ if (trafficInfo == null) {
+ TrafficInfo tmpInfo = new TrafficInfo(groupId, streamId);
+ trafficInfo = this.trafficMap.putIfAbsent(recordKey, tmpInfo);
+ if (trafficInfo == null) {
+ trafficInfo = tmpInfo;
+ }
+ }
+ trafficInfo.addSyncFailMsgInfo(msgCnt);
+ addSendErrCodeInfo(errCode);
+ }
+
+ public void addAsyncSendSucInfo(String groupId, String streamId, int
msgCnt) {
+ String recordKey = getKeyStringByConfig(groupId, streamId);
+ TrafficInfo trafficInfo = this.trafficMap.get(recordKey);
+ if (trafficInfo == null) {
+ TrafficInfo tmpInfo = new TrafficInfo(groupId, streamId);
+ trafficInfo = this.trafficMap.putIfAbsent(recordKey, tmpInfo);
+ if (trafficInfo == null) {
+ trafficInfo = tmpInfo;
+ }
+ }
+ trafficInfo.addAsyncSucSendInfo(msgCnt);
+ }
+
+ public void addAsyncSendFailInfo(String groupId, String streamId, int
msgCnt, int errCode) {
String recordKey = getKeyStringByConfig(groupId, streamId);
TrafficInfo trafficInfo = this.trafficMap.get(recordKey);
if (trafficInfo == null) {
@@ -228,10 +334,11 @@ public class MetricDataHolder implements Runnable {
trafficInfo = tmpInfo;
}
}
- trafficInfo.addSucMsgInfo(msgCnt, costMs);
+ trafficInfo.addAsyncFailSendInfo(msgCnt);
+ addSendErrCodeInfo(errCode);
}
- public void addSucMsgInfo(String groupId, String streamId, int msgCnt,
long sdCostMs, long cbCostMs) {
+ public void addAsyncHttpPutSucInfo(String groupId, String streamId,
int msgCnt) {
String recordKey = getKeyStringByConfig(groupId, streamId);
TrafficInfo trafficInfo = this.trafficMap.get(recordKey);
if (trafficInfo == null) {
@@ -241,10 +348,10 @@ public class MetricDataHolder implements Runnable {
trafficInfo = tmpInfo;
}
}
- trafficInfo.addSucMsgInfo(msgCnt, sdCostMs, cbCostMs);
+ trafficInfo.addAsyncHttpSucPutInfo(msgCnt);
}
- public void addFailMsgInfo(String groupId, String streamId, int
msgCnt, int errCode) {
+ public void addAsyncHttpPutFailInfo(String groupId, String streamId,
int msgCnt, int errCode) {
String recordKey = getKeyStringByConfig(groupId, streamId);
TrafficInfo trafficInfo = this.trafficMap.get(recordKey);
if (trafficInfo == null) {
@@ -254,12 +361,37 @@ public class MetricDataHolder implements Runnable {
trafficInfo = tmpInfo;
}
}
- trafficInfo.addFailMsgInfo(msgCnt);
+ trafficInfo.addAsyncHttpFailPutInfo(msgCnt);
addSendErrCodeInfo(errCode);
}
- public void addFailMsgInfo(String groupId, String streamId,
- int msgCnt, int errCode, long cbCostMs) {
+ public void addAsyncHttpGetSucInfo(String groupId, String streamId,
int msgCnt) {
+ String recordKey = getKeyStringByConfig(groupId, streamId);
+ TrafficInfo trafficInfo = this.trafficMap.get(recordKey);
+ if (trafficInfo == null) {
+ TrafficInfo tmpInfo = new TrafficInfo(groupId, streamId);
+ trafficInfo = this.trafficMap.putIfAbsent(recordKey, tmpInfo);
+ if (trafficInfo == null) {
+ trafficInfo = tmpInfo;
+ }
+ }
+ trafficInfo.addAsyncHttpSucGetInfo(msgCnt);
+ }
+
+ public void addAsyncRspSucInfo(String groupId, String streamId, int
msgCnt, long sdCostMs, long cbCostMs) {
+ String recordKey = getKeyStringByConfig(groupId, streamId);
+ TrafficInfo trafficInfo = this.trafficMap.get(recordKey);
+ if (trafficInfo == null) {
+ TrafficInfo tmpInfo = new TrafficInfo(groupId, streamId);
+ trafficInfo = this.trafficMap.putIfAbsent(recordKey, tmpInfo);
+ if (trafficInfo == null) {
+ trafficInfo = tmpInfo;
+ }
+ }
+ trafficInfo.addAsyncSucRspInfo(msgCnt, sdCostMs, cbCostMs);
+ }
+
+ public void addAsyncRspFailInfo(String groupId, String streamId, int
msgCnt, int errCode, long cbCostMs) {
String recordKey = getKeyStringByConfig(groupId, streamId);
TrafficInfo trafficInfo = this.trafficMap.get(recordKey);
if (trafficInfo == null) {
@@ -269,11 +401,11 @@ public class MetricDataHolder implements Runnable {
trafficInfo = tmpInfo;
}
}
- trafficInfo.addFailMsgInfo(msgCnt, cbCostMs);
+ trafficInfo.addAsyncFailRspInfo(msgCnt, cbCostMs);
addSendErrCodeInfo(errCode);
}
- public void addSendErrCodeInfo(int errCode) {
+ private void addSendErrCodeInfo(int errCode) {
LongAdder longCount = this.errCodeMap.get(errCode);
if (longCount == null) {
LongAdder tmpCount = new LongAdder();
@@ -288,7 +420,7 @@ public class MetricDataHolder implements Runnable {
public void getAndResetValue(StringBuilder strBuff) {
int count = 0;
metaSyncInfo.getAndResetValue(strBuff);
- strBuff.append(",\"m\":[");
+ strBuff.append(",\"tr\":[");
for (Map.Entry<String, TrafficInfo> entry : trafficMap.entrySet())
{
if (count++ > 0) {
strBuff.append(",");
@@ -325,9 +457,9 @@ public class MetricDataHolder implements Runnable {
.append(",\"lrT\":").append(lstReportTime)
.append(",");
metricUnit.getAndResetValue(strBuff);
- strBuff.append(",\"s\":{\"tNodes\":").append(sender.getProxyNodeCnt())
- .append(",\"aNodes\":").append(sender.getActiveNodeCnt())
- .append(",\"ifReqs\":").append(sender.getInflightMsgCnt())
+ strBuff.append(",\"s\":{\"tNs\":").append(sender.getProxyNodeCnt())
+ .append(",\"aNs\":").append(sender.getActiveNodeCnt())
+ .append(",\"ifRs\":").append(sender.getInflightMsgCnt())
.append("},\"c\":{\"aC\":").append(sender.getConfigure().getAliveConnections())
.append(",\"rP\":\"").append(sender.getConfigure().getDataRptProtocol())
.append("\",\"rG\":\"").append(sender.getConfigure().getRegionName())
@@ -335,22 +467,22 @@ public class MetricDataHolder implements Runnable {
if (sender instanceof TcpMsgSender) {
TcpMsgSenderConfig tcpConfig = (TcpMsgSenderConfig)
sender.getConfigure();
strBuff.append(",\"mT\":").append(tcpConfig.getSdkMsgType().getValue())
-
.append(",\"comp\":").append(tcpConfig.isEnableDataCompress())
-
.append(",\"mCLen\":").append(tcpConfig.getMinCompEnableLength())
-
.append(",\"lfSep\":").append(tcpConfig.isSeparateEventByLF())
+
.append(",\"cp\":").append(tcpConfig.isEnableDataCompress())
+
.append(",\"mCp\":").append(tcpConfig.getMinCompEnableLength())
+ .append(",\"lf\":").append(tcpConfig.isSeparateEventByLF())
.append(",\"nWk\":").append(tcpConfig.getNettyWorkerThreadNum())
.append(",\"sB\":").append(tcpConfig.getSendBufferSize())
.append(",\"rB\":").append(tcpConfig.getRcvBufferSize())
-
.append(",\"cOut\":").append(tcpConfig.getConnectTimeoutMs())
-
.append(",\"rOut\":").append(tcpConfig.getRequestTimeoutMs())
-
.append(",\"syncOut\":").append(tcpConfig.getMaxAllowedSyncMsgTimeoutCnt());
+
.append(",\"cOt\":").append(tcpConfig.getConnectTimeoutMs())
+
.append(",\"rOt\":").append(tcpConfig.getRequestTimeoutMs())
+
.append(",\"syOt\":").append(tcpConfig.getMaxAllowedSyncMsgTimeoutCnt());
} else {
HttpMsgSenderConfig httpConfig = (HttpMsgSenderConfig)
sender.getConfigure();
-
strBuff.append(",\"iHttps\":").append(httpConfig.isRptDataByHttps())
-
.append(",\"sOut\":").append(httpConfig.getHttpSocketTimeoutMs())
-
.append(",\"cOut\":").append(httpConfig.getHttpConTimeoutMs())
-
.append(",\"asyWk\":").append(httpConfig.getHttpAsyncRptWorkerNum())
-
.append(",\"asyCh\":").append(httpConfig.getHttpAsyncRptCacheSize());
+ strBuff.append(",\"iHs\":").append(httpConfig.isRptDataByHttps())
+
.append(",\"sOt\":").append(httpConfig.getHttpSocketTimeoutMs())
+
.append(",\"cOt\":").append(httpConfig.getHttpConTimeoutMs())
+
.append(",\"aWk\":").append(httpConfig.getHttpAsyncRptWorkerNum())
+
.append(",\"aC\":").append(httpConfig.getHttpAsyncRptCacheSize());
}
String content = strBuff.append("}}").toString();
strBuff.delete(0, strBuff.length());
diff --git
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/metric/TimeCostInfo.java
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/metric/TimeCostInfo.java
index 1bd33306a1..5a5ad7deb5 100644
---
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/metric/TimeCostInfo.java
+++
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/metric/TimeCostInfo.java
@@ -73,10 +73,10 @@ public class TimeCostInfo {
long curTotalCnt = totalCnt.sumThenReset();
if (curTotalCnt == 0) {
strBuff.append("\"").append(name)
-
.append("\":{\"bucketT\":{},\"min\":0,\"max\":0,\"avgT\":0}");
+ .append("\":{\"bkts\":{},\"min\":0,\"max\":0,\"avgT\":0}");
} else {
long bucketCnt = 0;
- strBuff.append("\"").append(name).append("\":{\"bucketT\":{");
+ strBuff.append("\"").append(name).append("\":{\"bkts\":{");
for (Map.Entry<String, LongAdder> entry :
sendTimeBucketT.entrySet()) {
if (bucketCnt++ > 0) {
strBuff.append(",");
diff --git
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/metric/TrafficInfo.java
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/metric/TrafficInfo.java
index 2622c47cef..759148f4ff 100644
---
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/metric/TrafficInfo.java
+++
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/metric/TrafficInfo.java
@@ -27,16 +27,44 @@ public class TrafficInfo {
private final String groupId;
// sId
private final String streamId;
- // sPkg
- private final LongAdder sendPkgCount = new LongAdder();
- // sMsg
- private final LongAdder sendMsgCount = new LongAdder();
- // fMsg
- private final LongAdder failedPgkCount = new LongAdder();
- // fMsg
- private final LongAdder failedMsgCount = new LongAdder();
- // sucMs: send success time cost in Ms
- private final TimeCostInfo sendCostMs = new TimeCostInfo("sucMs");
+
+ // sPs
+ private final LongAdder syncSendPkgCnt = new LongAdder();
+ // sMs
+ private final LongAdder syncSendMsgCount = new LongAdder();
+ // sPf
+ private final LongAdder syncFailedPgkCount = new LongAdder();
+ // sucMs: sync call time cost in Ms
+ private final TimeCostInfo syncSendCostMs = new TimeCostInfo("sSucMs");
+ // http
+ // apPs
+ private final LongAdder aSyncHttpPkgPutCnt = new LongAdder();
+ // apMs
+ private final LongAdder aSyncHttpMsgPutCnt = new LongAdder();
+ // apPf
+ private final LongAdder aSyncHttpFailPkgPutCnt = new LongAdder();
+ // agPs
+ private final LongAdder aSyncHttpPkgGetCnt = new LongAdder();
+ // agMs
+ private final LongAdder aSyncHttpMsgGetCnt = new LongAdder();
+ // aPs
+ private final LongAdder aSyncSendPkgCount = new LongAdder();
+ // aMs
+ private final LongAdder aSyncSendMsgCount = new LongAdder();
+ // aPf
+ private final LongAdder aSyncFailedPgkCnt = new LongAdder();
+ // arPs
+ private final LongAdder aRcvPkgCount = new LongAdder();
+ // arMs
+ private final LongAdder aRcvMsgCount = new LongAdder();
+ // arPf
+ private final LongAdder aRcvFailedPgkCount = new LongAdder();
+ // arMf
+ private final LongAdder aRcvFailedMsgCount = new LongAdder();
+ // sucMs: async received time cost in Ms
+ private final TimeCostInfo asyncSucCostMs = new TimeCostInfo("aSucMs");
+ // call back call count
+ private final LongAdder cbCallCount = new LongAdder();
// cbMs: call back time cost in Ms
private final TimeCostInfo callbackCostMs = new TimeCostInfo("cbMs");
@@ -45,27 +73,51 @@ public class TrafficInfo {
this.streamId = streamId;
}
- public void addSucMsgInfo(int msgCnt, long costMs) {
- sendPkgCount.add(1);
- sendMsgCount.add(msgCnt);
- sendCostMs.addTimeCostInMs(costMs);
+ public void addSyncSucMsgInfo(int msgCnt, long costMs) {
+ syncSendPkgCnt.increment();
+ syncSendMsgCount.add(msgCnt);
+ syncSendCostMs.addTimeCostInMs(costMs);
}
- public void addSucMsgInfo(int msgCnt, long sdCostMs, long cbCostMs) {
- sendPkgCount.add(1);
- sendMsgCount.add(msgCnt);
- sendCostMs.addTimeCostInMs(sdCostMs);
- callbackCostMs.addTimeCostInMs(cbCostMs);
+ public void addSyncFailMsgInfo(int msgCnt) {
+ syncFailedPgkCount.increment();
+ }
+
+ public void addAsyncSucSendInfo(int msgCnt) {
+ aSyncSendPkgCount.increment();
+ aSyncSendMsgCount.add(msgCnt);
}
- public void addFailMsgInfo(int msgCnt) {
- failedPgkCount.add(1);
- failedMsgCount.add(msgCnt);
+ public void addAsyncFailSendInfo(int msgCnt) {
+ aSyncFailedPgkCnt.increment();
+ }
+
+ public void addAsyncHttpSucPutInfo(int msgCnt) {
+ aSyncHttpPkgPutCnt.increment();
+ aSyncHttpMsgPutCnt.add(msgCnt);
+ }
+
+ public void addAsyncHttpFailPutInfo(int msgCnt) {
+ aSyncHttpFailPkgPutCnt.increment();
+ }
+
+ public void addAsyncHttpSucGetInfo(int msgCnt) {
+ aSyncHttpPkgGetCnt.increment();
+ aSyncHttpMsgGetCnt.add(msgCnt);
+ }
+
+ public void addAsyncSucRspInfo(int msgCnt, long sdCostMs, long cbCostMs) {
+ aRcvPkgCount.increment();
+ aRcvMsgCount.add(msgCnt);
+ asyncSucCostMs.addTimeCostInMs(sdCostMs);
+ cbCallCount.increment();
+ callbackCostMs.addTimeCostInMs(cbCostMs);
}
- public void addFailMsgInfo(int msgCnt, long cbCostMs) {
- failedPgkCount.add(1);
- failedMsgCount.add(msgCnt);
+ public void addAsyncFailRspInfo(int msgCnt, long cbCostMs) {
+ aRcvFailedPgkCount.increment();
+ aRcvFailedMsgCount.add(msgCnt);
+ cbCallCount.increment();
callbackCostMs.addTimeCostInMs(cbCostMs);
}
@@ -74,23 +126,50 @@ public class TrafficInfo {
if (StringUtils.isNotBlank(this.streamId)) {
strBuff.append("\",\"sId\":\"").append(this.streamId);
}
- strBuff.append("\",\"sPkg\":").append(sendPkgCount.sumThenReset())
- .append(",\"sMsg\":").append(sendMsgCount.sumThenReset())
- .append(",\"fPkg\":").append(failedPgkCount.sumThenReset())
- .append(",\"fMsg\":").append(failedMsgCount.sumThenReset())
- .append(",");
- this.sendCostMs.getAndResetValue(strBuff);
+ strBuff.append("\",\"sPs\":").append(syncSendPkgCnt.sumThenReset())
+ .append(",\"sPf\":").append(syncFailedPgkCount.sumThenReset())
+
.append(",\"sMs\":").append(syncSendMsgCount.sumThenReset()).append(",");
+ this.syncSendCostMs.getAndResetValue(strBuff);
+ strBuff.append(",\"apPs\":").append(aSyncHttpPkgPutCnt.sumThenReset())
+
.append(",\"apPf\":").append(aSyncHttpFailPkgPutCnt.sumThenReset())
+ .append(",\"agPs\":").append(aSyncHttpPkgGetCnt.sumThenReset())
+ .append(",\"aPs\":").append(aSyncSendPkgCount.sumThenReset())
+ .append(",\"aPf\":").append(aSyncFailedPgkCnt.sumThenReset())
+ .append(",\"arPs\":").append(aRcvPkgCount.sumThenReset())
+ .append(",\"arPf\":").append(aRcvFailedPgkCount.sumThenReset())
+ .append(",\"cbCt\":").append(cbCallCount.sumThenReset())
+ .append(",\"apMs\":").append(aSyncHttpMsgPutCnt.sumThenReset())
+ .append(",\"agMs\":").append(aSyncHttpMsgGetCnt.sumThenReset())
+ .append(",\"aMs\":").append(aSyncSendMsgCount.sumThenReset())
+ .append(",\"arMs\":").append(aRcvMsgCount.sumThenReset())
+
.append(",\"arMf\":").append(aRcvFailedMsgCount.sumThenReset()).append(",");
+ this.asyncSucCostMs.getAndResetValue(strBuff);
strBuff.append(",");
this.callbackCostMs.getAndResetValue(strBuff);
strBuff.append("}");
}
public void clear() {
- this.sendPkgCount.reset();
- this.sendMsgCount.reset();
- this.failedPgkCount.reset();
- this.failedMsgCount.reset();
- this.sendCostMs.clear();
+ this.syncSendPkgCnt.reset();
+ this.syncSendMsgCount.reset();
+ this.syncFailedPgkCount.reset();
+ this.syncSendCostMs.clear();
+ //
+ this.aSyncSendPkgCount.reset();
+ this.aSyncSendMsgCount.reset();
+ this.aSyncFailedPgkCnt.reset();
+ this.aSyncHttpPkgPutCnt.reset();
+ this.aSyncHttpMsgPutCnt.reset();
+ this.aSyncHttpFailPkgPutCnt.reset();
+ this.aSyncHttpPkgGetCnt.reset();
+ this.aSyncHttpMsgGetCnt.reset();
+ //
+ this.aRcvPkgCount.reset();
+ this.aRcvMsgCount.reset();
+ this.aRcvFailedPgkCount.reset();
+ this.aRcvFailedMsgCount.reset();
+ this.asyncSucCostMs.clear();
+ this.cbCallCount.reset();
this.callbackCostMs.clear();
}
}
diff --git
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/http/HttpClientMgr.java
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/http/HttpClientMgr.java
index bc4d2884c3..1700e394d5 100644
---
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/http/HttpClientMgr.java
+++
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/http/HttpClientMgr.java
@@ -80,6 +80,7 @@ public class HttpClientMgr implements ClientMgr {
private final LinkedBlockingQueue<HttpAsyncObj> messageCache;
private final Semaphore asyncIdleCellCnt;
private final ExecutorService workerServices =
Executors.newCachedThreadPool();
+ private volatile boolean existSend = false;
private final AtomicBoolean shutDown = new AtomicBoolean(false);
// meta info
private ConcurrentHashMap<String, HostInfo> usingNodeMaps = new
ConcurrentHashMap<>();
@@ -126,9 +127,8 @@ public class HttpClientMgr implements ClientMgr {
long stopTime = System.currentTimeMillis();
logger.info("ClientMgr({}) is closing...", this.sender.getSenderId());
if (!messageCache.isEmpty()) {
- if (httpConfig.isDiscardHttpCacheWhenClosing()) {
- messageCache.clear();
- } else {
+ if (!httpConfig.isDiscardHttpCacheWhenClosing()) {
+ // wait last event report
long startTime = System.currentTimeMillis();
while (!messageCache.isEmpty()) {
if (System.currentTimeMillis() - startTime >=
httpConfig.getHttpCloseWaitPeriodMs()) {
@@ -136,8 +136,43 @@ public class HttpClientMgr implements ClientMgr {
}
ProxyUtils.sleepSomeTime(100L);
}
- remainCnt = messageCache.size();
- messageCache.clear();
+ }
+ // force exist report
+ existSend = true;
+ // call back result
+ boolean isSucc;
+ long currentTime;
+ HttpAsyncObj asyncObj;
+ while (!messageCache.isEmpty()) {
+ asyncObj = messageCache.poll();
+ if (asyncObj == null) {
+ continue;
+ }
+ isSucc = true;
+ currentTime = System.currentTimeMillis();
+ sender.getMetricHolder().addAsyncHttpSucGetMetric(
+ asyncObj.getHttpEvent().getGroupId(),
+ asyncObj.getHttpEvent().getStreamId(),
+ asyncObj.getHttpEvent().getMsgCnt());
+ try {
+ asyncObj.getCallback().onMessageAck(new
ProcessResult(ErrorCode.SDK_CLOSED));
+ } catch (Throwable ex) {
+ isSucc = false;
+ if (asyncSendExptCnt.shouldPrint()) {
+ logger.error("HttpAsync({}) callback event exception",
this.sender.getSenderId(), ex);
+ }
+ } finally {
+ asyncIdleCellCnt.release();
+ if (isSucc) {
+
sender.getMetricHolder().addCallbackSucMetric(asyncObj.getHttpEvent().getGroupId(),
+ asyncObj.getHttpEvent().getStreamId(),
asyncObj.getHttpEvent().getMsgCnt(),
+ (currentTime - asyncObj.getRptMs()),
(System.currentTimeMillis() - currentTime));
+ } else {
+
sender.getMetricHolder().addCallbackFailMetric(ErrorCode.SDK_CLOSED.getErrCode(),
+ asyncObj.getHttpEvent().getGroupId(),
asyncObj.getHttpEvent().getStreamId(),
+ asyncObj.getHttpEvent().getMsgCnt(),
(System.currentTimeMillis() - currentTime));
+ }
+ }
}
}
workerServices.shutdown();
@@ -443,10 +478,17 @@ public class HttpClientMgr implements ClientMgr {
// if not shutdown or queue is not empty
while (!shutDown.get() || !messageCache.isEmpty()) {
while (!messageCache.isEmpty()) {
+ if (existSend) {
+ break;
+ }
asyncObj = messageCache.poll();
if (asyncObj == null) {
continue;
}
+ sender.getMetricHolder().addAsyncHttpSucGetMetric(
+ asyncObj.getHttpEvent().getGroupId(),
+ asyncObj.getHttpEvent().getStreamId(),
+ asyncObj.getHttpEvent().getMsgCnt());
try {
sendMessage(asyncObj.getHttpEvent(), procResult);
curTime = System.currentTimeMillis();
@@ -468,6 +510,9 @@ public class HttpClientMgr implements ClientMgr {
}
}
}
+ if (existSend) {
+ break;
+ }
ProxyUtils.sleepSomeTime(httpConfig.getHttpAsyncWorkerIdleWaitMs());
}
logger.info("HttpAsyncReportWorker({}) stopped", this.workerId);
diff --git
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/tcp/TcpClientMgr.java
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/tcp/TcpClientMgr.java
index ec048f54ab..0a9c74b616 100644
---
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/tcp/TcpClientMgr.java
+++
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/tcp/TcpClientMgr.java
@@ -281,7 +281,7 @@ public class TcpClientMgr implements ClientMgr {
}
rmvMsgStubInfo(encObject.getMessageId());
}
- return procResult.setSuccess();
+ return procResult.isSuccess();
} else {
// process sync report
if (!client.write(clientTerm, encObject, procResult)) {
diff --git
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/tcp/TcpNettyClient.java
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/tcp/TcpNettyClient.java
index 29899e6f58..c521b04cd1 100644
---
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/tcp/TcpNettyClient.java
+++
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/tcp/TcpNettyClient.java
@@ -265,6 +265,7 @@ public class TcpNettyClient {
this.msgSentCnt.incrementAndGet();
this.channel.writeAndFlush(encodeObject);
this.msgInflightCnt.incrementAndGet();
+ return procResult.setSuccess();
} catch (Throwable ex) {
if (conExptCnt.shouldPrint()) {
logger.warn("NettyClient({}) write {} exception",
@@ -274,7 +275,6 @@ public class TcpNettyClient {
} finally {
this.rw.readLock().unlock();
}
- return procResult.setSuccess();
}
public void setFrozen(long termId) {
diff --git
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/sender/BaseSender.java
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/sender/BaseSender.java
index 3720a98ecb..99e0c38289 100644
---
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/sender/BaseSender.java
+++
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/sender/BaseSender.java
@@ -28,6 +28,7 @@ import
org.apache.inlong.sdk.dataproxy.config.ProxyConfigManager;
import org.apache.inlong.sdk.dataproxy.metric.MetricDataHolder;
import org.apache.inlong.sdk.dataproxy.network.ClientMgr;
import org.apache.inlong.sdk.dataproxy.utils.LogCounter;
+import org.apache.inlong.sdk.dataproxy.utils.ProxyUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -85,7 +86,8 @@ public abstract class BaseSender implements ConfigHolder {
this.baseConfig = configure.clone();
this.senderFactory = senderFactory;
this.factoryClusterIdKey = clusterIdKey;
- this.senderId = configure.getDataRptProtocol() + "-" +
senderIdGen.incrementAndGet();
+ this.senderId = configure.getDataRptProtocol()
+ + "-" + ProxyUtils.getProcessPid() + "-" +
senderIdGen.incrementAndGet();
this.configManager = new ProxyConfigManager(this.senderId,
this.baseConfig, this);
this.configManager.setDaemon(true);
this.metricHolder = new MetricDataHolder(this);
diff --git
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/sender/http/InLongHttpMsgSender.java
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/sender/http/InLongHttpMsgSender.java
index 1c6e0a53e6..b764e40b52 100644
---
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/sender/http/InLongHttpMsgSender.java
+++
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/sender/http/InLongHttpMsgSender.java
@@ -67,10 +67,10 @@ public class InLongHttpMsgSender extends BaseSender
implements HttpMsgSender {
return httpClientMgr.sendMessage(eventInfo, procResult);
} finally {
if (procResult.isSuccess()) {
- metricHolder.addSucMetric(eventInfo.getGroupId(),
eventInfo.getStreamId(),
+ metricHolder.addSyncSucMetric(eventInfo.getGroupId(),
eventInfo.getStreamId(),
eventInfo.getMsgCnt(), (System.currentTimeMillis() -
curTime));
} else {
- metricHolder.addFailMetric(procResult.getErrCode(),
+ metricHolder.addSyncFailMetric(procResult.getErrCode(),
eventInfo.getGroupId(), eventInfo.getStreamId(),
eventInfo.getMsgCnt());
}
}
@@ -92,8 +92,11 @@ public class InLongHttpMsgSender extends BaseSender
implements HttpMsgSender {
}
return httpClientMgr.asyncSendMessage(new HttpAsyncObj(eventInfo,
callback), procResult);
} finally {
- if (!procResult.isSuccess()) {
- metricHolder.addFailMetric(procResult.getErrCode(),
+ if (procResult.isSuccess()) {
+ metricHolder.addAsyncHttpSucPutMetric(
+ eventInfo.getGroupId(), eventInfo.getStreamId(),
eventInfo.getMsgCnt());
+ } else {
+ metricHolder.addAsyncHttpFailPutMetric(procResult.getErrCode(),
eventInfo.getGroupId(), eventInfo.getStreamId(),
eventInfo.getMsgCnt());
}
}
diff --git
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/sender/tcp/InLongTcpMsgSender.java
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/sender/tcp/InLongTcpMsgSender.java
index 084cd3b8a7..8add9f15f3 100644
---
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/sender/tcp/InLongTcpMsgSender.java
+++
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/sender/tcp/InLongTcpMsgSender.java
@@ -84,10 +84,10 @@ public class InLongTcpMsgSender extends BaseSender
implements TcpMsgSender {
return processEvent(SendQos.SOURCE_ACK, eventInfo, null,
procResult);
} finally {
if (procResult.isSuccess()) {
- metricHolder.addSucMetric(eventInfo.getGroupId(),
eventInfo.getStreamId(),
+ metricHolder.addSyncSucMetric(eventInfo.getGroupId(),
eventInfo.getStreamId(),
eventInfo.getMsgCnt(), (System.currentTimeMillis() -
curTime));
} else {
- metricHolder.addFailMetric(procResult.getErrCode(),
+ metricHolder.addSyncFailMetric(procResult.getErrCode(),
eventInfo.getGroupId(), eventInfo.getStreamId(),
eventInfo.getMsgCnt());
}
}
@@ -100,15 +100,14 @@ public class InLongTcpMsgSender extends BaseSender
implements TcpMsgSender {
if (!this.isStarted()) {
return procResult.setFailResult(ErrorCode.SDK_CLOSED);
}
- long curTime = System.currentTimeMillis();
try {
return processEvent(SendQos.SOURCE_ACK, eventInfo, callback,
procResult);
} finally {
if (procResult.isSuccess()) {
- metricHolder.addSucMetric(eventInfo.getGroupId(),
eventInfo.getStreamId(),
- eventInfo.getMsgCnt(), (System.currentTimeMillis() -
curTime));
+ metricHolder.addAsyncSucReqMetric(
+ eventInfo.getGroupId(), eventInfo.getStreamId(),
eventInfo.getMsgCnt());
} else {
- metricHolder.addFailMetric(procResult.getErrCode(),
+ metricHolder.addAsyncFailReqMetric(procResult.getErrCode(),
eventInfo.getGroupId(), eventInfo.getStreamId(),
eventInfo.getMsgCnt());
}
}
@@ -125,10 +124,10 @@ public class InLongTcpMsgSender extends BaseSender
implements TcpMsgSender {
return processEvent(SendQos.NO_ACK, eventInfo, null, procResult);
} finally {
if (procResult.isSuccess()) {
- metricHolder.addSucMetric(eventInfo.getGroupId(),
eventInfo.getStreamId(),
+ metricHolder.addSyncSucMetric(eventInfo.getGroupId(),
eventInfo.getStreamId(),
eventInfo.getMsgCnt(), (System.currentTimeMillis() -
curTime));
} else {
- metricHolder.addFailMetric(procResult.getErrCode(),
+ metricHolder.addSyncFailMetric(procResult.getErrCode(),
eventInfo.getGroupId(), eventInfo.getStreamId(),
eventInfo.getMsgCnt());
}
}
@@ -145,10 +144,10 @@ public class InLongTcpMsgSender extends BaseSender
implements TcpMsgSender {
return processEvent(SendQos.SINK_ACK, eventInfo, null, procResult);
} finally {
if (procResult.isSuccess()) {
- metricHolder.addSucMetric(eventInfo.getGroupId(),
eventInfo.getStreamId(),
+ metricHolder.addSyncSucMetric(eventInfo.getGroupId(),
eventInfo.getStreamId(),
eventInfo.getMsgCnt(), (System.currentTimeMillis() -
curTime));
} else {
- metricHolder.addFailMetric(procResult.getErrCode(),
+ metricHolder.addSyncFailMetric(procResult.getErrCode(),
eventInfo.getGroupId(), eventInfo.getStreamId(),
eventInfo.getMsgCnt());
}
}
@@ -161,15 +160,14 @@ public class InLongTcpMsgSender extends BaseSender
implements TcpMsgSender {
if (!this.isStarted()) {
return procResult.setFailResult(ErrorCode.SDK_CLOSED);
}
- long curTime = System.currentTimeMillis();
try {
return processEvent(SendQos.SINK_ACK, eventInfo, callback,
procResult);
} finally {
if (procResult.isSuccess()) {
- metricHolder.addSucMetric(eventInfo.getGroupId(),
eventInfo.getStreamId(),
- eventInfo.getMsgCnt(), (System.currentTimeMillis() -
curTime));
+ metricHolder.addAsyncSucReqMetric(
+ eventInfo.getGroupId(), eventInfo.getStreamId(),
eventInfo.getMsgCnt());
} else {
- metricHolder.addFailMetric(procResult.getErrCode(),
+ metricHolder.addAsyncFailReqMetric(procResult.getErrCode(),
eventInfo.getGroupId(), eventInfo.getStreamId(),
eventInfo.getMsgCnt());
}
}