This is an automated email from the ASF dual-hosted git repository.
gosonzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 84748072a4 [INLONG-11713][SDK] Optimize BaseMsgSenderFactory and
TimeCostInfo implementation (#11714)
84748072a4 is described below
commit 84748072a44af7d38f33894843f093746446d3d0
Author: Goson Zhang <[email protected]>
AuthorDate: Mon Feb 3 10:32:48 2025 +0800
[INLONG-11713][SDK] Optimize BaseMsgSenderFactory and TimeCostInfo
implementation (#11714)
* [INLONG-11713][SDK] Optimize BaseMsgSenderFactory and TimeCostInfo
implementation
* [INLONG-11713][SDK] Optimize BaseMsgSenderFactory and TimeCostInfo
implementation
---------
Co-authored-by: gosonzhang <[email protected]>
---
.../manager/service/cluster/InlongClusterServiceTest.java | 2 +-
.../org/apache/inlong/sdk/dataproxy/BaseMsgSenderFactory.java | 5 +++--
.../org/apache/inlong/sdk/dataproxy/metric/TimeCostInfo.java | 10 +++++-----
3 files changed, 9 insertions(+), 8 deletions(-)
diff --git
a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/cluster/InlongClusterServiceTest.java
b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/cluster/InlongClusterServiceTest.java
index bea9e09930..fee6c91d53 100644
---
a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/cluster/InlongClusterServiceTest.java
+++
b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/cluster/InlongClusterServiceTest.java
@@ -358,7 +358,7 @@ public class InlongClusterServiceTest extends
ServiceBaseTest {
Assertions.assertNotNull(id);
// save cluster node
- String ip = "127.0.0.1";
+ String ip = "127.0.0.2";
Integer port1 = 46800;
Integer nodeId1 =
this.saveDataProxyClusterNode(id, ClusterType.DATAPROXY, ip,
port1, ProtocolType.TCP);
diff --git
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/BaseMsgSenderFactory.java
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/BaseMsgSenderFactory.java
index 50fa183c58..923ebd2b9f 100644
---
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/BaseMsgSenderFactory.java
+++
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/BaseMsgSenderFactory.java
@@ -68,6 +68,7 @@ public class BaseMsgSenderFactory {
public void close() {
int totalSenderCnt;
int totalTDBankCnt;
+ logger.info("MsgSenderFactory({}) is closing", this.factoryNo);
senderCacheLock.writeLock().lock();
try {
// release groupId mapped senders
@@ -197,7 +198,7 @@ public class BaseMsgSenderFactory {
validProxyConfigNotNull(configure);
// get groupId's clusterIdKey
ProcessResult procResult = new ProcessResult();
- ProxyConfigEntry proxyConfigEntry = qryProxyMetaConfigure(configure,
procResult);;
+ ProxyConfigEntry proxyConfigEntry = qryProxyMetaConfigure(configure,
procResult);
String clusterIdKey = ProxyUtils.buildClusterIdKey(
configure.getDataRptProtocol(), configure.getRegionName(),
proxyConfigEntry.getClusterId());
// get local built sender
@@ -288,7 +289,7 @@ public class BaseMsgSenderFactory {
&& !inlongMetaQryMgr.getEncryptConfigure(true, procResult)) {
throw new ProxySdkException("Failed to query remote encrypt
config: " + procResult);
}
- return inlongMetaQryMgr.getProxyConfigEntry();
+ return (ProxyConfigEntry) procResult.getRetData();
}
private boolean removeGroupIdSender(BaseSender msgSender, Map<String,
BaseSender> senderMap) {
diff --git
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/metric/TimeCostInfo.java
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/metric/TimeCostInfo.java
index a7545b5d60..50c2e1a6bc 100644
---
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/metric/TimeCostInfo.java
+++
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/metric/TimeCostInfo.java
@@ -70,22 +70,22 @@ public class TimeCostInfo {
}
public void getAndResetValue(StringBuilder strBuff) {
- long curCnt = totalCnt.sumThenReset();
- if (curCnt == 0) {
+ long curTotalCnt = totalCnt.sumThenReset();
+ if (curTotalCnt == 0) {
strBuff.append("\"").append(name)
.append("\":{\"bucketT\":{},\"min\":0,\"max\":0,\"avgT\":0,\"cnt\":0}");
} else {
- curCnt = 0;
+ long bucketCnt = 0;
strBuff.append("\"").append(name).append("\":{\"bucketT\":{");
for (Map.Entry<String, LongAdder> entry :
sendTimeBucketT.entrySet()) {
- if (curCnt++ > 0) {
+ if (bucketCnt++ > 0) {
strBuff.append(",");
}
strBuff.append("\"").append(entry.getKey()).append("\":").append(entry.getValue());
}
strBuff.append("},\"min\":").append(this.minValue.getAndSet(Long.MAX_VALUE))
.append(",\"max\":").append(this.maxValue.getAndSet(Long.MIN_VALUE))
- .append(",\"avgT\":").append(sumTime.sumThenReset() /
curCnt).append("}");
+ .append(",\"avgT\":").append(sumTime.sumThenReset() /
curTotalCnt).append("}");
sendTimeBucketT.clear();
}
}