This is an automated email from the ASF dual-hosted git repository.
gosonzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new dd9328c275 [INLONG-11754][SDK] Add the total number of in-flight
requests and total size limits (#11755)
dd9328c275 is described below
commit dd9328c275b2342cfece3f1ed0939ce43b2b71cb
Author: Goson Zhang <[email protected]>
AuthorDate: Thu Feb 13 13:00:46 2025 +0800
[INLONG-11754][SDK] Add the total number of in-flight requests and total
size limits (#11755)
---------
Co-authored-by: gosonzhang <[email protected]>
---
.../apache/inlong/agent/core/HeartbeatManager.java | 3 +-
.../plugin/sinks/filecollect/SenderManager.java | 2 +-
.../inlong/sdk/dataproxy/BaseMsgSenderFactory.java | 15 +-
.../inlong/sdk/dataproxy/MsgSenderFactory.java | 8 +
.../sdk/dataproxy/MsgSenderMultiFactory.java | 14 +-
.../sdk/dataproxy/MsgSenderSingleFactory.java | 14 +-
.../inlong/sdk/dataproxy/common/ErrorCode.java | 14 +-
.../sdk/dataproxy/common/ProxyClientConfig.java | 49 +++++-
.../inlong/sdk/dataproxy/common/SdkConsts.java | 13 +-
.../sdk/dataproxy/metric/MetricDataHolder.java | 18 ++-
.../sdk/dataproxy/network/PkgCacheQuota.java | 171 +++++++++++++++++++++
.../sdk/dataproxy/network/http/HttpClientMgr.java | 18 +--
.../sdk/dataproxy/network/tcp/TcpCallFuture.java | 6 +
.../sdk/dataproxy/network/tcp/TcpClientMgr.java | 20 ++-
.../sdk/dataproxy/network/tcp/TcpNettyClient.java | 2 +-
.../dataproxy/network/tcp/codec/EncodeObject.java | 8 +-
.../inlong/sdk/dataproxy/sender/BaseSender.java | 74 +++++++++
.../dataproxy/sender/http/HttpMsgSenderConfig.java | 18 +--
.../dataproxy/sender/http/InLongHttpMsgSender.java | 8 +
.../dataproxy/sender/tcp/InLongTcpMsgSender.java | 37 ++++-
.../inlong/sdk/dataproxy/PkgCacheQuotaTest.java | 88 +++++++++++
.../sdk/dataproxy/ProxyClientConfigTest.java | 8 +-
22 files changed, 552 insertions(+), 56 deletions(-)
diff --git
a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/HeartbeatManager.java
b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/HeartbeatManager.java
index 50282be586..82cd5cb648 100644
---
a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/HeartbeatManager.java
+++
b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/HeartbeatManager.java
@@ -201,7 +201,8 @@ public class HeartbeatManager extends AbstractDaemon
implements AbstractHeartbea
try {
proxyClientConfig = new TcpMsgSenderConfig(managerAddr,
INLONG_AGENT_SYSTEM, authSecretId, authSecretKey);
-
proxyClientConfig.setSendBufferSize(CommonConstants.DEFAULT_PROXY_TOTAL_ASYNC_PROXY_SIZE);
+ proxyClientConfig.setMaxInFlightSizeInKb(
+ CommonConstants.DEFAULT_PROXY_TOTAL_ASYNC_PROXY_SIZE /
1024);
proxyClientConfig.setAliveConnections(CommonConstants.DEFAULT_PROXY_ALIVE_CONNECTION_NUM);
proxyClientConfig.setNettyWorkerThreadNum(CommonConstants.DEFAULT_PROXY_CLIENT_IO_THREAD_NUM);
proxyClientConfig.setRequestTimeoutMs(30000L);
diff --git
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/filecollect/SenderManager.java
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/filecollect/SenderManager.java
index 34c1b67f70..76593b7512 100755
---
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/filecollect/SenderManager.java
+++
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/filecollect/SenderManager.java
@@ -203,7 +203,7 @@ public class SenderManager {
private void createMessageSender() throws Exception {
TcpMsgSenderConfig proxyClientConfig = new TcpMsgSenderConfig(
managerAddr, inlongGroupId, authSecretId, authSecretKey);
- proxyClientConfig.setSendBufferSize(totalAsyncBufSize);
+ proxyClientConfig.setMaxInFlightSizeInKb(totalAsyncBufSize / 1024);
proxyClientConfig.setAliveConnections(aliveConnectionNum);
proxyClientConfig.setRequestTimeoutMs(maxSenderTimeout * 1000L);
proxyClientConfig.setNettyWorkerThreadNum(ioThreadNum);
diff --git
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/BaseMsgSenderFactory.java
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/BaseMsgSenderFactory.java
index 623a318188..ce276b9cf6 100644
---
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/BaseMsgSenderFactory.java
+++
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/BaseMsgSenderFactory.java
@@ -19,9 +19,11 @@ package org.apache.inlong.sdk.dataproxy;
import org.apache.inlong.sdk.dataproxy.common.ProcessResult;
import org.apache.inlong.sdk.dataproxy.common.ProxyClientConfig;
+import org.apache.inlong.sdk.dataproxy.common.SdkConsts;
import org.apache.inlong.sdk.dataproxy.config.ProxyConfigEntry;
import org.apache.inlong.sdk.dataproxy.config.ProxyConfigManager;
import org.apache.inlong.sdk.dataproxy.exception.ProxySdkException;
+import org.apache.inlong.sdk.dataproxy.network.PkgCacheQuota;
import org.apache.inlong.sdk.dataproxy.sender.BaseSender;
import org.apache.inlong.sdk.dataproxy.sender.http.HttpMsgSenderConfig;
import org.apache.inlong.sdk.dataproxy.sender.http.InLongHttpMsgSender;
@@ -52,6 +54,7 @@ public class BaseMsgSenderFactory {
// msg send factory
private final MsgSenderFactory msgSenderFactory;
private final String factoryNo;
+ private final PkgCacheQuota pkgCacheQuota;
// for senders
private final ReentrantReadWriteLock senderCacheLock = new
ReentrantReadWriteLock();
// for inlong groupId -- Sender map
@@ -59,10 +62,14 @@ public class BaseMsgSenderFactory {
// for inlong clusterId -- Sender map
private final ConcurrentHashMap<String, BaseSender> clusterIdSenderMap =
new ConcurrentHashMap<>();
- public BaseMsgSenderFactory(MsgSenderFactory msgSenderFactory, String
factoryNo) {
+ protected BaseMsgSenderFactory(MsgSenderFactory msgSenderFactory,
+ String factoryNo, int factoryPkgCntPermits, int
factoryPkgSizeKbPermits) {
this.msgSenderFactory = msgSenderFactory;
this.factoryNo = factoryNo;
- logger.info("MsgSenderFactory({}) started", this.factoryNo);
+ this.pkgCacheQuota = new PkgCacheQuota(true, factoryNo,
+ factoryPkgCntPermits, factoryPkgSizeKbPermits,
SdkConsts.VAL_DEF_PADDING_SIZE);
+ logger.info("MsgSenderFactory({}) started, factoryPkgCntPermits={},
factoryPkgSizeKbPermits={}",
+ this.factoryNo, factoryPkgCntPermits, factoryPkgSizeKbPermits);
}
public void close() {
@@ -105,6 +112,10 @@ public class BaseMsgSenderFactory {
}
}
+ public PkgCacheQuota getPkgCacheQuota() {
+ return pkgCacheQuota;
+ }
+
public int getMsgSenderCount() {
return groupIdSenderMap.size() + clusterIdSenderMap.size();
}
diff --git
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/MsgSenderFactory.java
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/MsgSenderFactory.java
index 7fbae90964..3d7b8fb4a5 100644
---
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/MsgSenderFactory.java
+++
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/MsgSenderFactory.java
@@ -18,6 +18,7 @@
package org.apache.inlong.sdk.dataproxy;
import org.apache.inlong.sdk.dataproxy.exception.ProxySdkException;
+import org.apache.inlong.sdk.dataproxy.network.PkgCacheQuota;
import org.apache.inlong.sdk.dataproxy.sender.BaseSender;
import org.apache.inlong.sdk.dataproxy.sender.http.HttpMsgSenderConfig;
import org.apache.inlong.sdk.dataproxy.sender.http.InLongHttpMsgSender;
@@ -53,6 +54,13 @@ public interface MsgSenderFactory {
*/
int getMsgSenderCount();
+ /**
+ * Get factory level inflight request quota
+ *
+ * @return the factory level inflight request quota
+ */
+ PkgCacheQuota getFactoryPkgCacheQuota();
+
/**
* Get or generate a sender from the factory according to groupId
*
diff --git
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/MsgSenderMultiFactory.java
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/MsgSenderMultiFactory.java
index 559b7d7bac..7d4f9d5110 100644
---
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/MsgSenderMultiFactory.java
+++
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/MsgSenderMultiFactory.java
@@ -17,7 +17,9 @@
package org.apache.inlong.sdk.dataproxy;
+import org.apache.inlong.sdk.dataproxy.common.SdkConsts;
import org.apache.inlong.sdk.dataproxy.exception.ProxySdkException;
+import org.apache.inlong.sdk.dataproxy.network.PkgCacheQuota;
import org.apache.inlong.sdk.dataproxy.sender.BaseSender;
import org.apache.inlong.sdk.dataproxy.sender.http.HttpMsgSenderConfig;
import org.apache.inlong.sdk.dataproxy.sender.http.InLongHttpMsgSender;
@@ -41,8 +43,13 @@ public class MsgSenderMultiFactory implements
MsgSenderFactory {
private final BaseMsgSenderFactory baseMsgSenderFactory;
public MsgSenderMultiFactory() {
+ this(SdkConsts.UNDEFINED_VALUE, SdkConsts.UNDEFINED_VALUE);
+ }
+
+ public MsgSenderMultiFactory(int factoryPkgCntPermits, int
factoryPkgSizeKbPermits) {
this.baseMsgSenderFactory = new BaseMsgSenderFactory(this,
- "iMultiFact-" + ProxyUtils.getProcessPid() + "-" +
refCounter.incrementAndGet());
+ "iMultiFact-" + ProxyUtils.getProcessPid() + "-" +
refCounter.incrementAndGet(),
+ factoryPkgCntPermits, factoryPkgSizeKbPermits);
this.initialized.set(true);
}
@@ -69,6 +76,11 @@ public class MsgSenderMultiFactory implements
MsgSenderFactory {
return this.baseMsgSenderFactory.getMsgSenderCount();
}
+ @Override
+ public PkgCacheQuota getFactoryPkgCacheQuota() {
+ return baseMsgSenderFactory.getPkgCacheQuota();
+ }
+
@Override
public InLongTcpMsgSender genTcpSenderByGroupId(
TcpMsgSenderConfig configure) throws ProxySdkException {
diff --git
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/MsgSenderSingleFactory.java
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/MsgSenderSingleFactory.java
index 4e99c6550b..4d795101c2 100644
---
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/MsgSenderSingleFactory.java
+++
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/MsgSenderSingleFactory.java
@@ -17,7 +17,9 @@
package org.apache.inlong.sdk.dataproxy;
+import org.apache.inlong.sdk.dataproxy.common.SdkConsts;
import org.apache.inlong.sdk.dataproxy.exception.ProxySdkException;
+import org.apache.inlong.sdk.dataproxy.network.PkgCacheQuota;
import org.apache.inlong.sdk.dataproxy.sender.BaseSender;
import org.apache.inlong.sdk.dataproxy.sender.http.HttpMsgSenderConfig;
import org.apache.inlong.sdk.dataproxy.sender.http.InLongHttpMsgSender;
@@ -42,10 +44,15 @@ public class MsgSenderSingleFactory implements
MsgSenderFactory {
private static BaseMsgSenderFactory baseMsgSenderFactory;
public MsgSenderSingleFactory() {
+ this(SdkConsts.UNDEFINED_VALUE, SdkConsts.UNDEFINED_VALUE);
+ }
+
+ public MsgSenderSingleFactory(int factoryPkgCntPermits, int
factoryPkgSizeKbPermits) {
synchronized (singletonRefCounter) {
if (singletonRefCounter.incrementAndGet() == 1) {
baseMsgSenderFactory = new BaseMsgSenderFactory(this,
- "iSingleFct-" + ProxyUtils.getProcessPid() + "-" +
refCounter.incrementAndGet());
+ "iSingleFct-" + ProxyUtils.getProcessPid() + "-" +
refCounter.incrementAndGet(),
+ factoryPkgCntPermits, factoryPkgSizeKbPermits);
initialized.set(true);
}
}
@@ -86,6 +93,11 @@ public class MsgSenderSingleFactory implements
MsgSenderFactory {
return baseMsgSenderFactory.getMsgSenderCount();
}
+ @Override
+ public PkgCacheQuota getFactoryPkgCacheQuota() {
+ return baseMsgSenderFactory.getPkgCacheQuota();
+ }
+
@Override
public InLongTcpMsgSender genTcpSenderByGroupId(
TcpMsgSenderConfig configure) throws ProxySdkException {
diff --git
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/common/ErrorCode.java
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/common/ErrorCode.java
index b1dd6f26c3..379d97c4f4 100644
---
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/common/ErrorCode.java
+++
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/common/ErrorCode.java
@@ -63,6 +63,13 @@ public enum ErrorCode {
//
FETCH_PROXY_META_FAILURE(59, "Fetch dataproxy meta info failure"),
FETCH_ENCRYPT_META_FAILURE(60, "Fetch encrypt meta info failure"),
+
+ //
+ INF_REQ_COUNT_REACH_FACTORY_LIMIT(71, "In-flight Request count reach
factory limit"),
+ INF_REQ_SIZE_REACH_FACTORY_LIMIT(72, "In-flight Request size reach factory
limit"),
+ INF_REQ_COUNT_REACH_SDK_LIMIT(73, "In-flight Request count reach sdk
limit"),
+ INF_REQ_SIZE_REACH_SDK_LIMIT(74, "In-flight Request size reach sdk limit"),
+
//
NO_NODE_META_INFOS(81, "No proxy node metadata info in local"),
EMPTY_ACTIVE_NODE_SET(82, "Empty active node set"),
@@ -94,10 +101,9 @@ public enum ErrorCode {
//
DP_RECEIVE_FAILURE(160, "DataProxy return message receive failure"),
//
- HTTP_ASYNC_POOL_FULL(171, "Http async pool full"),
- HTTP_ASYNC_OFFER_FAIL(172, "Http async offer event fail"),
- HTTP_ASYNC_OFFER_EXCEPTION(173, "Http async offer event exception"),
- HTTP_BUILD_CLIENT_EXCEPTION(174, "Http build client exception"),
+ HTTP_ASYNC_OFFER_FAIL(171, "Http async offer event fail"),
+ HTTP_ASYNC_OFFER_EXCEPTION(172, "Http async offer event exception"),
+ HTTP_BUILD_CLIENT_EXCEPTION(173, "Http build client exception"),
//
BUILD_FORM_CONTENT_EXCEPTION(181, "Build form content exception"),
DP_RETURN_FAILURE(182, "DataProxy return failure"),
diff --git
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/common/ProxyClientConfig.java
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/common/ProxyClientConfig.java
index 4079eaf81e..1065bbf259 100644
---
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/common/ProxyClientConfig.java
+++
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/common/ProxyClientConfig.java
@@ -85,6 +85,11 @@ public class ProxyClientConfig implements Cloneable {
private int aliveConnections = SdkConsts.VAL_DEF_ALIVE_CONNECTIONS;
// node forced selection interval ms
private long forceReChooseInrMs = SdkConsts.VAL_DEF_FORCE_CHOOSE_INR_MS;
+ // max inflight request count and size per SDK
+ private int maxInFlightReqCnt = SdkConsts.UNDEFINED_VALUE;
+ private int maxInFlightSizeKb = SdkConsts.UNDEFINED_VALUE;
+ private int paddingSize = SdkConsts.VAL_DEF_PADDING_SIZE;
+
// metric setting
private final MetricConfig metricConfig = new MetricConfig();
// report setting
@@ -304,7 +309,6 @@ public class ProxyClientConfig implements Cloneable {
public void setMetaSyncWaitMsIfFail(int metaSyncWaitMsIfFail) {
this.metaSyncWaitMsIfFail =
Math.min(SdkConsts.VAL_MAX_WAIT_MS_IF_CONFIG_REQ_FAIL,
Math.max(SdkConsts.VAL_MIN_WAIT_MS_IF_CONFIG_REQ_FAIL,
metaSyncWaitMsIfFail));
-
}
public String getMetaStoreBasePath() {
@@ -389,6 +393,40 @@ public class ProxyClientConfig implements Cloneable {
this.metricConfig.setMetricKeyMaskInfos(maskGroupId, maskStreamId);
}
+ public int getMaxInFlightReqCnt() {
+ return maxInFlightReqCnt;
+ }
+
+ public int getMaxInFlightSizeKb() {
+ return maxInFlightSizeKb;
+ }
+
+ public void setMaxInFlightReqCnt(int maxInFlightReqCnt) {
+ if (maxInFlightReqCnt > 0) {
+ this.maxInFlightReqCnt = maxInFlightReqCnt;
+ }
+ }
+
+ public void setMaxInFlightSizeInKb(int maxInFlightSizeInKb) {
+ if (maxInFlightSizeInKb > 0) {
+ this.maxInFlightSizeKb = maxInFlightSizeInKb;
+ }
+ }
+
+ public void setMaxInFlightPermitsPerSdk(int maxInFlightReqCnt, int
maxInFlightSizeInKb) {
+ setMaxInFlightReqCnt(maxInFlightReqCnt);
+ setMaxInFlightSizeInKb(maxInFlightSizeInKb);
+ }
+
+ public int getPaddingSize() {
+ return paddingSize;
+ }
+
+ public void setPaddingSize(int paddingSize) {
+ this.paddingSize = Math.min(SdkConsts.VAL_MAX_PADDING_SIZE,
+ Math.max(SdkConsts.VAL_MIN_PADDING_SIZE, paddingSize));
+ }
+
public MetricConfig getMetricConfig() {
return metricConfig;
}
@@ -413,6 +451,9 @@ public class ProxyClientConfig implements Cloneable {
&& aliveConnections == that.aliveConnections
&& forceReChooseInrMs == that.forceReChooseInrMs
&& enableReportAuthz == that.enableReportAuthz
+ && maxInFlightReqCnt == that.maxInFlightReqCnt
+ && maxInFlightSizeKb == that.maxInFlightSizeKb
+ && paddingSize == that.paddingSize
&& enableReportEncrypt == that.enableReportEncrypt
&& Objects.equals(tlsVersion, that.tlsVersion)
&& Objects.equals(managerIP, that.managerIP)
@@ -439,7 +480,8 @@ public class ProxyClientConfig implements Cloneable {
mgrMetaSyncInrMs, metaSyncMaxRetryIfFail, metaStoreBasePath,
metaCacheExpiredMs, metaQryFailCacheExpiredMs,
aliveConnections,
forceReChooseInrMs, metricConfig, enableReportAuthz,
enableReportEncrypt,
- rptRsaPubKeyUrl, rptUserName, rptSecretKey);
+ rptRsaPubKeyUrl, rptUserName, rptSecretKey, maxInFlightReqCnt,
+ maxInFlightSizeKb, paddingSize);
}
@Override
@@ -476,6 +518,9 @@ public class ProxyClientConfig implements Cloneable {
.append(", forceReChooseInrMs=").append(forceReChooseInrMs)
.append(", enableReportAuthz=").append(enableReportAuthz)
.append(", enableReportEncrypt=").append(enableReportEncrypt)
+ .append(", maxInFlightReqCnt=").append(maxInFlightReqCnt)
+ .append(", maxInFlightSizeKb=").append(maxInFlightSizeKb)
+ .append(", paddingSize=").append(paddingSize)
.append(", rptRsaPubKeyUrl='").append(rptRsaPubKeyUrl)
.append("', rptUserName='").append(rptUserName)
.append("', rptSecretKey='").append(rptSecretKey)
diff --git
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/common/SdkConsts.java
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/common/SdkConsts.java
index 7979851d3e..ae4d5e64dd 100644
---
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/common/SdkConsts.java
+++
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/common/SdkConsts.java
@@ -36,6 +36,8 @@ public class SdkConsts {
public static final String VAL_DEF_REGION_NAME = "";
// http report method
public static final String DATAPROXY_REPORT_METHOD = "/dataproxy/message";
+ // undefined value
+ public static final int UNDEFINED_VALUE = -1;
// config info sync interval in minutes
public static final int VAL_DEF_CONFIG_SYNC_INTERVAL_MIN = 3;
public static final int VAL_MIN_CONFIG_SYNC_INTERVAL_MIN = 1;
@@ -110,9 +112,8 @@ public class SdkConsts {
public static final long VAL_DEF_HTTP_REUSE_FAIL_WAIT_MS = 20000L;
public static final long VAL_MAX_HTTP_REUSE_FAIL_WAIT_MS = 300000L;
public static final long VAL_MIN_HTTP_REUSE_FAIL_WAIT_MS = 1000L;
- // HTTP async report request cache size
- public static final int VAL_DEF_HTTP_ASYNC_RPT_CACHE_SIZE = 2000;
- public static final int VAL_MIN_HTTP_ASYNC_RPT_CACHE_SIZE = 1;
+ // HTTP async report request cache request count
+ public static final int VAL_DEF_HTTP_ASYNC_RPT_CACHE_CNT = 2000;
// HTTP async report worker thread count
public static final int VAL_DEF_HTTP_ASYNC_RPT_WORKER_NUM =
Runtime.getRuntime().availableProcessors();
@@ -133,4 +134,10 @@ public class SdkConsts {
/* Reserved attribute data size(bytes). */
public static int RESERVED_ATTRIBUTE_LENGTH = 256;
+ // unit KB
+ public static final int UNIT_KB_SIZE = 1024;
+ // padding size per package
+ public static final int VAL_DEF_PADDING_SIZE = 200;
+ public static final int VAL_MIN_PADDING_SIZE = 0;
+ public static final int VAL_MAX_PADDING_SIZE = 4096;
}
diff --git
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/metric/MetricDataHolder.java
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/metric/MetricDataHolder.java
index 4b16f62c8d..627ab30dfb 100644
---
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/metric/MetricDataHolder.java
+++
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/metric/MetricDataHolder.java
@@ -22,8 +22,8 @@ import org.apache.inlong.sdk.dataproxy.sender.BaseSender;
import org.apache.inlong.sdk.dataproxy.sender.http.HttpMsgSenderConfig;
import org.apache.inlong.sdk.dataproxy.sender.tcp.TcpMsgSender;
import org.apache.inlong.sdk.dataproxy.sender.tcp.TcpMsgSenderConfig;
-import org.apache.inlong.sdk.dataproxy.utils.LogCounter;
import org.apache.inlong.sdk.dataproxy.utils.ProxyUtils;
+import org.apache.inlong.sdk.dataproxy.utils.Tuple2;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -40,7 +40,6 @@ public class MetricDataHolder implements Runnable {
private static final String DEFAULT_KEY_SPLITTER = "#";
private static final Logger logger =
LoggerFactory.getLogger(MetricDataHolder.class);
- private static final LogCounter exceptCnt = new LogCounter(5, 100000, 60 *
1000L);
private final MetricConfig metricConfig;
private final BaseSender sender;
@@ -457,10 +456,22 @@ public class MetricDataHolder implements Runnable {
.append(",\"lrT\":").append(lstReportTime)
.append(",");
metricUnit.getAndResetValue(strBuff);
+ Tuple2<Integer, Integer> factoryAvailQuota =
sender.getFactoryAvailQuota();
+ Tuple2<Integer, Integer> senderAvailQuota =
sender.getSenderAvailQuota();
strBuff.append(",\"s\":{\"tNs\":").append(sender.getProxyNodeCnt())
.append(",\"aNs\":").append(sender.getActiveNodeCnt())
.append(",\"ifRs\":").append(sender.getInflightMsgCnt())
+ .append(",\"afPc\":").append(factoryAvailQuota.getF0())
+ .append(",\"afPs\":").append(factoryAvailQuota.getF1())
+ .append(",\"aPc\":").append(senderAvailQuota.getF0())
+ .append(",\"aPs\":").append(senderAvailQuota.getF1())
.append("},\"c\":{\"aC\":").append(sender.getConfigure().getAliveConnections())
+ .append(",\"gBf\":").append(sender.isGenByFactory())
+ .append(",\"ifCc\":").append(sender.getFactoryPkgCntPermits())
+ .append(",\"ifCs\":").append(sender.getFactoryPkgCntPermits())
+
.append(",\"iCc\":").append(sender.getConfigure().getMaxInFlightReqCnt())
+
.append(",\"iCs\":").append(sender.getConfigure().getMaxInFlightSizeKb())
+
.append(",\"iRp\":").append(sender.getConfigure().getPaddingSize())
.append(",\"rP\":\"").append(sender.getConfigure().getDataRptProtocol())
.append("\",\"rG\":\"").append(sender.getConfigure().getRegionName())
.append("\"");
@@ -481,8 +492,7 @@ public class MetricDataHolder implements Runnable {
strBuff.append(",\"iHs\":").append(httpConfig.isRptDataByHttps())
.append(",\"sOt\":").append(httpConfig.getHttpSocketTimeoutMs())
.append(",\"cOt\":").append(httpConfig.getHttpConTimeoutMs())
-
.append(",\"aWk\":").append(httpConfig.getHttpAsyncRptWorkerNum())
-
.append(",\"aC\":").append(httpConfig.getHttpAsyncRptCacheSize());
+
.append(",\"aWk\":").append(httpConfig.getHttpAsyncRptWorkerNum());
}
String content = strBuff.append("}}").toString();
strBuff.delete(0, strBuff.length());
diff --git
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/PkgCacheQuota.java
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/PkgCacheQuota.java
new file mode 100644
index 0000000000..424ec41554
--- /dev/null
+++
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/PkgCacheQuota.java
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sdk.dataproxy.network;
+
+import org.apache.inlong.sdk.dataproxy.common.ErrorCode;
+import org.apache.inlong.sdk.dataproxy.common.ProcessResult;
+import org.apache.inlong.sdk.dataproxy.common.SdkConsts;
+import org.apache.inlong.sdk.dataproxy.utils.Tuple2;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.Semaphore;
+
+/**
+ * Package Cache Quota class
+ *
+ * Used to manage the total number and byte size of ongoing requests.
+ */
+public class PkgCacheQuota {
+
+ private static final Logger logger =
LoggerFactory.getLogger(PkgCacheQuota.class);
+ public static final Tuple2<Integer, Integer> DISABLE_RET =
+ new Tuple2<>(SdkConsts.UNDEFINED_VALUE, SdkConsts.UNDEFINED_VALUE);
+ // whether factory level quota
+ private final boolean factoryLevel;
+ // pkg count permits
+ private final int pkgCntPermits;
+ // pkg size permits
+ private final int pkgSizeKbPermits;
+ // reserved size per package
+ private final int paddingSizePerPkg;
+ // whether disable package inflight quota function
+ private final boolean disabled;
+ // package count quota
+ private final Semaphore pkgCntQuota;
+ // package size quota in KB
+ private final Semaphore pkgSizeKbQuota;
+
+ public PkgCacheQuota(boolean factoryLevel, String parentId,
+ int pkgCntPermits, int pkgSizeKbPermits, int paddingSizePerPkg) {
+ this.factoryLevel = factoryLevel;
+ this.pkgCntPermits = pkgCntPermits;
+ this.pkgSizeKbPermits = pkgSizeKbPermits;
+ this.paddingSizePerPkg = paddingSizePerPkg;
+ if (pkgCntPermits > 0) {
+ this.pkgCntQuota = new Semaphore(pkgCntPermits);
+ } else {
+ this.pkgCntQuota = null;
+ }
+ if (pkgSizeKbPermits > 0) {
+ this.pkgSizeKbQuota = new Semaphore(pkgSizeKbPermits);
+ } else {
+ this.pkgSizeKbQuota = null;
+ }
+ this.disabled = (this.pkgCntQuota == null
+ && this.pkgSizeKbQuota == null);
+ logger.info("PkgCacheQuota({}) created,
factoryLevel={},pkgCnt={},pkgSizeKb={},padSize={}",
+ factoryLevel, parentId, pkgCntPermits, pkgSizeKbPermits,
paddingSizePerPkg);
+ }
+
+ public Tuple2<Integer, Integer> getPkgCacheAvailQuota() {
+ if (this.disabled) {
+ return DISABLE_RET;
+ }
+ int cntAvailable = SdkConsts.UNDEFINED_VALUE;
+ if (this.pkgCntQuota != null) {
+ cntAvailable = this.pkgCntQuota.availablePermits();
+ }
+ int sizeAvailable = SdkConsts.UNDEFINED_VALUE;
+ if (this.pkgSizeKbQuota != null) {
+ sizeAvailable = this.pkgSizeKbQuota.availablePermits();
+ }
+ return new Tuple2<>(cntAvailable, sizeAvailable);
+ }
+
+ public boolean tryAcquire(int sizeInByte, ProcessResult procResult) {
+ if (this.disabled) {
+ return procResult.setSuccess();
+ }
+ if (this.pkgCntQuota == null) {
+ if (this.pkgSizeKbQuota.tryAcquire(
+ getSizeKbPermitsByBytes(sizeInByte))) {
+ return procResult.setSuccess();
+ }
+ return procResult.setFailResult(factoryLevel
+ ? ErrorCode.INF_REQ_SIZE_REACH_FACTORY_LIMIT
+ : ErrorCode.INF_REQ_SIZE_REACH_SDK_LIMIT);
+ } else {
+ if (this.pkgCntQuota.tryAcquire(1)) {
+ if (this.pkgSizeKbQuota == null) {
+ return procResult.setSuccess();
+ } else {
+ if (this.pkgSizeKbQuota.tryAcquire(
+ getSizeKbPermitsByBytes(sizeInByte))) {
+ return procResult.setSuccess();
+ } else {
+ this.pkgCntQuota.release();
+ return procResult.setFailResult(factoryLevel
+ ? ErrorCode.INF_REQ_SIZE_REACH_FACTORY_LIMIT
+ : ErrorCode.INF_REQ_SIZE_REACH_SDK_LIMIT);
+ }
+ }
+ } else {
+ return procResult.setFailResult(factoryLevel
+ ? ErrorCode.INF_REQ_COUNT_REACH_FACTORY_LIMIT
+ : ErrorCode.INF_REQ_COUNT_REACH_SDK_LIMIT);
+ }
+ }
+ }
+
+ public void release(int sizeInByte) {
+ if (this.disabled) {
+ return;
+ }
+ if (this.pkgCntQuota == null) {
+ this.pkgSizeKbQuota.release(
+ getSizeKbPermitsByBytes(sizeInByte));
+ } else {
+ if (this.pkgSizeKbQuota != null) {
+ this.pkgSizeKbQuota.release(
+ getSizeKbPermitsByBytes(sizeInByte));
+ }
+ this.pkgCntQuota.release();
+ }
+ }
+
+ public boolean isFactoryLevel() {
+ return factoryLevel;
+ }
+
+ public int getPkgCntPermits() {
+ return pkgCntPermits;
+ }
+
+ public int getPkgSizeKbPermits() {
+ return pkgSizeKbPermits;
+ }
+
+ public int getPaddingSizePerPkg() {
+ return paddingSizePerPkg;
+ }
+
+ public boolean isDisabled() {
+ return disabled;
+ }
+
+ private int getSizeKbPermitsByBytes(int sizeInByte) {
+ int tmpValue = sizeInByte + this.paddingSizePerPkg;
+ if (tmpValue % SdkConsts.UNIT_KB_SIZE == 0) {
+ return (tmpValue / SdkConsts.UNIT_KB_SIZE);
+ } else {
+ return (tmpValue / SdkConsts.UNIT_KB_SIZE) + 1;
+ }
+ }
+}
diff --git
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/http/HttpClientMgr.java
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/http/HttpClientMgr.java
index 1700e394d5..cc325290a6 100644
---
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/http/HttpClientMgr.java
+++
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/http/HttpClientMgr.java
@@ -56,7 +56,6 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
@@ -78,7 +77,6 @@ public class HttpClientMgr implements ClientMgr {
private final HttpMsgSenderConfig httpConfig;
private CloseableHttpClient httpClient;
private final LinkedBlockingQueue<HttpAsyncObj> messageCache;
- private final Semaphore asyncIdleCellCnt;
private final ExecutorService workerServices =
Executors.newCachedThreadPool();
private volatile boolean existSend = false;
private final AtomicBoolean shutDown = new AtomicBoolean(false);
@@ -94,8 +92,7 @@ public class HttpClientMgr implements ClientMgr {
public HttpClientMgr(BaseSender sender, HttpMsgSenderConfig httpConfig) {
this.sender = sender;
this.httpConfig = httpConfig;
- this.messageCache = new
LinkedBlockingQueue<>(httpConfig.getHttpAsyncRptCacheSize());
- this.asyncIdleCellCnt = new
Semaphore(httpConfig.getHttpAsyncRptCacheSize(), true);
+ this.messageCache = new
LinkedBlockingQueue<>(httpConfig.getMaxInFlightReqCnt());
}
@Override
@@ -162,7 +159,7 @@ public class HttpClientMgr implements ClientMgr {
logger.error("HttpAsync({}) callback event exception",
this.sender.getSenderId(), ex);
}
} finally {
- asyncIdleCellCnt.release();
+
sender.releaseCachePermits(asyncObj.getHttpEvent().getBodySize());
if (isSucc) {
sender.getMetricHolder().addCallbackSucMetric(asyncObj.getHttpEvent().getGroupId(),
asyncObj.getHttpEvent().getStreamId(),
asyncObj.getHttpEvent().getMsgCnt(),
@@ -282,21 +279,12 @@ public class HttpClientMgr implements ClientMgr {
if (curNodes.isEmpty()) {
return procResult.setFailResult(ErrorCode.EMPTY_ACTIVE_NODE_SET);
}
- if (!this.asyncIdleCellCnt.tryAcquire()) {
- return procResult.setFailResult(ErrorCode.HTTP_ASYNC_POOL_FULL);
- }
- boolean released = false;
try {
if (!this.messageCache.offer(asyncObj)) {
- this.asyncIdleCellCnt.release();
- released = true;
return
procResult.setFailResult(ErrorCode.HTTP_ASYNC_OFFER_FAIL);
}
return procResult.setSuccess();
} catch (Throwable ex) {
- if (!released) {
- this.asyncIdleCellCnt.release();
- }
if (asyncSendExptCnt.shouldPrint()) {
logger.warn("ClientMgr({}) async offer event exception",
this.sender.getSenderId(), ex);
}
@@ -498,7 +486,7 @@ public class HttpClientMgr implements ClientMgr {
logger.error("HttpAsync({}) report event
exception", workerId, ex);
}
} finally {
- asyncIdleCellCnt.release();
+
sender.releaseCachePermits(asyncObj.getHttpEvent().getBodySize());
if (procResult.isSuccess()) {
sender.getMetricHolder().addCallbackSucMetric(asyncObj.getHttpEvent().getGroupId(),
asyncObj.getHttpEvent().getStreamId(),
asyncObj.getHttpEvent().getMsgCnt(),
diff --git
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/tcp/TcpCallFuture.java
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/tcp/TcpCallFuture.java
index 90132018a7..1d6cfa800e 100644
---
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/tcp/TcpCallFuture.java
+++
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/tcp/TcpCallFuture.java
@@ -36,6 +36,7 @@ public class TcpCallFuture implements MsgSendCallback {
private final String groupId;
private final String streamId;
private final int msgCnt;
+ private final int eventSize;
private final long rtTime;
private final String clientAddr;
private final long chanTerm;
@@ -53,6 +54,7 @@ public class TcpCallFuture implements MsgSendCallback {
this.streamId = encObject.getStreamId();
this.rtTime = encObject.getRtms();
this.msgCnt = encObject.getMsgCnt();
+ this.eventSize = encObject.getEventSize();
this.clientAddr = clientAddr;
this.chanTerm = chanTerm;
this.chanStr = chanStr;
@@ -132,4 +134,8 @@ public class TcpCallFuture implements MsgSendCallback {
public boolean isAsyncCall() {
return isAsyncCall;
}
+
+ public int getEventSize() {
+ return eventSize;
+ }
}
diff --git
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/tcp/TcpClientMgr.java
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/tcp/TcpClientMgr.java
index 0a9c74b616..0ad5e44e44 100644
---
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/tcp/TcpClientMgr.java
+++
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/tcp/TcpClientMgr.java
@@ -394,6 +394,7 @@ public class TcpClientMgr implements ClientMgr {
}
} finally {
this.descInflightMsgCnt(callFuture);
+ this.releaseAsyncCachedPermits(callFuture);
if (decObject.getSendResult().isSuccess()) {
baseSender.getMetricHolder().addCallbackSucMetric(callFuture.getGroupId(),
callFuture.getStreamId(), callFuture.getMsgCnt(),
@@ -475,6 +476,7 @@ public class TcpClientMgr implements ClientMgr {
}
} finally {
nettyTcpClient.decInFlightMsgCnt(callFuture.getChanTerm());
+ this.releaseAsyncCachedPermits(callFuture);
baseSender.getMetricHolder().addCallbackFailMetric(ErrorCode.CONNECTION_BREAK.getErrCode(),
callFuture.getGroupId(), callFuture.getStreamId(),
callFuture.getMsgCnt(),
(System.currentTimeMillis() - curTime));
@@ -495,6 +497,7 @@ public class TcpClientMgr implements ClientMgr {
}
} finally {
nettyTcpClient.decInFlightMsgCnt(callFuture.getChanTerm());
+ this.releaseAsyncCachedPermits(callFuture);
baseSender.getMetricHolder().addCallbackFailMetric(ErrorCode.CONNECTION_BREAK.getErrCode(),
callFuture.getGroupId(), callFuture.getStreamId(),
callFuture.getMsgCnt(),
(System.currentTimeMillis() - curTime));
@@ -578,6 +581,7 @@ public class TcpClientMgr implements ClientMgr {
}
} finally {
nettyTcpClient.decInFlightMsgCnt(callFuture.getChanTerm());
+ this.releaseAsyncCachedPermits(callFuture);
baseSender.getMetricHolder().addCallbackFailMetric(ErrorCode.SDK_CLOSED.getErrCode(),
callFuture.getGroupId(), callFuture.getStreamId(),
callFuture.getMsgCnt(),
(System.currentTimeMillis() - curTime));
@@ -598,6 +602,7 @@ public class TcpClientMgr implements ClientMgr {
}
} finally {
nettyTcpClient.decInFlightMsgCnt(callFuture.getChanTerm());
+ this.releaseAsyncCachedPermits(callFuture);
baseSender.getMetricHolder().addCallbackFailMetric(ErrorCode.SDK_CLOSED.getErrCode(),
callFuture.getGroupId(), callFuture.getStreamId(),
callFuture.getMsgCnt(),
(System.currentTimeMillis() - curTime));
@@ -606,6 +611,12 @@ public class TcpClientMgr implements ClientMgr {
}
}
+ private void releaseAsyncCachedPermits(TcpCallFuture callFuture) {
+ if (callFuture.isAsyncCall()) {
+ baseSender.releaseCachePermits(callFuture.getEventSize());
+ }
+ }
+
private class MaintThread extends Thread {
private volatile boolean bShutDown;
@@ -729,9 +740,11 @@ public class TcpClientMgr implements ClientMgr {
}
} finally {
nettyTcpClient.decInFlightMsgCnt(future.getChanTerm());
-
baseSender.getMetricHolder().addCallbackFailMetric(ErrorCode.SEND_WAIT_TIMEOUT.getErrCode(),
- future.getGroupId(), future.getStreamId(),
future.getMsgCnt(),
- (System.currentTimeMillis() - curTime));
+ releaseAsyncCachedPermits(future);
+ baseSender.getMetricHolder().addCallbackFailMetric(
+ ErrorCode.SEND_WAIT_TIMEOUT.getErrCode(),
+ future.getGroupId(), future.getStreamId(),
+ future.getMsgCnt(), (System.currentTimeMillis() -
curTime));
}
return;
}
@@ -753,6 +766,7 @@ public class TcpClientMgr implements ClientMgr {
}
} finally {
nettyTcpClient.decInFlightMsgCnt(future.getChanTerm());
+ releaseAsyncCachedPermits(future);
baseSender.getMetricHolder().addCallbackFailMetric(ErrorCode.SEND_WAIT_TIMEOUT.getErrCode(),
future.getGroupId(), future.getStreamId(),
future.getMsgCnt(),
(System.currentTimeMillis() - curTime));
diff --git
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/tcp/TcpNettyClient.java
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/tcp/TcpNettyClient.java
index c521b04cd1..073e2b60c0 100644
---
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/tcp/TcpNettyClient.java
+++
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/tcp/TcpNettyClient.java
@@ -454,7 +454,7 @@ public class TcpNettyClient {
private EncodeObject buildHeartBeatMsg(String senderId, ProxyClientConfig
configure) {
EncodeObject encObject = new EncodeObject(null, null,
- MsgType.MSG_BIN_HEARTBEAT, System.currentTimeMillis());
+ MsgType.MSG_BIN_HEARTBEAT, System.currentTimeMillis(), 30);
encObject.setMessageIdInfo(0);
int intMsgType = encObject.getMsgType().getValue();
Map<String, String> newAttrs = new HashMap<>();
diff --git
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/tcp/codec/EncodeObject.java
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/tcp/codec/EncodeObject.java
index a32227c1af..3b44f9a3ce 100644
---
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/tcp/codec/EncodeObject.java
+++
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/tcp/codec/EncodeObject.java
@@ -44,6 +44,7 @@ public class EncodeObject {
private final long rtms;
private int messageId;
private int msgCnt = 0;
+ private int eventSize;
private int extField = 0;
private int attrDataLength = 0;
private byte[] attrData = null;
@@ -56,10 +57,11 @@ public class EncodeObject {
private boolean compress;
private byte[] aesKey;
- public EncodeObject(String groupId, String streamId, MsgType msgType, long
dtMs) {
+ public EncodeObject(String groupId, String streamId, MsgType msgType, long
dtMs, int eventSize) {
this.groupId = groupId;
this.streamId = streamId;
this.msgType = msgType;
+ this.eventSize = eventSize;
this.intMsgType = this.msgType.getValue();
this.rtms = System.currentTimeMillis();
if (this.msgType == MsgType.MSG_BIN_MULTI_BODY) {
@@ -185,4 +187,8 @@ public class EncodeObject {
public long getDtMs() {
return dtMs;
}
+
+ public int getEventSize() {
+ return eventSize;
+ }
}
diff --git
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/sender/BaseSender.java
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/sender/BaseSender.java
index 99e0c38289..cb4d2bae29 100644
---
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/sender/BaseSender.java
+++
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/sender/BaseSender.java
@@ -21,14 +21,17 @@ import org.apache.inlong.sdk.dataproxy.MsgSenderFactory;
import org.apache.inlong.sdk.dataproxy.common.ErrorCode;
import org.apache.inlong.sdk.dataproxy.common.ProcessResult;
import org.apache.inlong.sdk.dataproxy.common.ProxyClientConfig;
+import org.apache.inlong.sdk.dataproxy.common.SdkConsts;
import org.apache.inlong.sdk.dataproxy.config.ConfigHolder;
import org.apache.inlong.sdk.dataproxy.config.HostInfo;
import org.apache.inlong.sdk.dataproxy.config.ProxyConfigEntry;
import org.apache.inlong.sdk.dataproxy.config.ProxyConfigManager;
import org.apache.inlong.sdk.dataproxy.metric.MetricDataHolder;
import org.apache.inlong.sdk.dataproxy.network.ClientMgr;
+import org.apache.inlong.sdk.dataproxy.network.PkgCacheQuota;
import org.apache.inlong.sdk.dataproxy.utils.LogCounter;
import org.apache.inlong.sdk.dataproxy.utils.ProxyUtils;
+import org.apache.inlong.sdk.dataproxy.utils.Tuple2;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -76,6 +79,8 @@ public abstract class BaseSender implements ConfigHolder {
protected volatile boolean idTransNum = false;
protected volatile int groupIdNum = 0;
private Map<String, Integer> streamIdMap = new HashMap<>();
+ private final PkgCacheQuota globalCacheQuota;
+ private final PkgCacheQuota sdkPkgCacheQuota;
// metric holder
protected MetricDataHolder metricHolder;
@@ -91,6 +96,13 @@ public abstract class BaseSender implements ConfigHolder {
this.configManager = new ProxyConfigManager(this.senderId,
this.baseConfig, this);
this.configManager.setDaemon(true);
this.metricHolder = new MetricDataHolder(this);
+ if (this.senderFactory == null) {
+ this.globalCacheQuota = null;
+ } else {
+ this.globalCacheQuota =
this.senderFactory.getFactoryPkgCacheQuota();
+ }
+ this.sdkPkgCacheQuota = new PkgCacheQuota(false, this.senderId,
+ configure.getMaxInFlightReqCnt(),
configure.getMaxInFlightSizeKb(), configure.getPaddingSize());
}
public boolean start(ProcessResult procResult) {
@@ -198,6 +210,10 @@ public abstract class BaseSender implements ConfigHolder {
return senderStatus.get() == SENDER_STATUS_STARTED;
}
+ public boolean isGenByFactory() {
+ return senderFactory != null;
+ }
+
public MsgSenderFactory getSenderFactory() {
return senderFactory;
}
@@ -242,6 +258,64 @@ public abstract class BaseSender implements ConfigHolder {
return proxyNodeInfos.size();
}
+ public Tuple2<Integer, Integer> getFactoryAvailQuota() {
+ if (senderFactory == null) {
+ return PkgCacheQuota.DISABLE_RET;
+ }
+ return globalCacheQuota.getPkgCacheAvailQuota();
+ }
+
+ public Tuple2<Integer, Integer> getSenderAvailQuota() {
+ return sdkPkgCacheQuota.getPkgCacheAvailQuota();
+ }
+
+ public int getFactoryPkgCntPermits() {
+ if (senderFactory == null) {
+ return SdkConsts.UNDEFINED_VALUE;
+ }
+ return senderFactory.getFactoryPkgCacheQuota().getPkgCntPermits();
+ }
+
+ public int getFactoryPkgSizeKbPermits() {
+ if (senderFactory == null) {
+ return SdkConsts.UNDEFINED_VALUE;
+ }
+ return senderFactory.getFactoryPkgCacheQuota().getPkgSizeKbPermits();
+ }
+
+ public int getSenderPkgCntPermits() {
+ return sdkPkgCacheQuota.getPkgCntPermits();
+ }
+
+ public int getSenderPkgSizeKbPermits() {
+ return sdkPkgCacheQuota.getPkgSizeKbPermits();
+ }
+
+ public boolean tryAcquireCachePermits(int sizeInByte, ProcessResult
procResult) {
+ if (this.globalCacheQuota == null) {
+ return this.sdkPkgCacheQuota.tryAcquire(sizeInByte, procResult);
+ } else {
+ if (this.globalCacheQuota.tryAcquire(sizeInByte, procResult)) {
+ if (this.sdkPkgCacheQuota.tryAcquire(sizeInByte, procResult)) {
+ return true;
+ } else {
+ this.globalCacheQuota.release(sizeInByte);
+ return false;
+ }
+ }
+ return false;
+ }
+ }
+
+ public void releaseCachePermits(int sizeInByte) {
+ if (this.globalCacheQuota == null) {
+ this.sdkPkgCacheQuota.release(sizeInByte);
+ } else {
+ this.sdkPkgCacheQuota.release(sizeInByte);
+ this.globalCacheQuota.release(sizeInByte);
+ }
+ }
+
public abstract int getActiveNodeCnt();
public abstract int getInflightMsgCnt();
diff --git
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/sender/http/HttpMsgSenderConfig.java
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/sender/http/HttpMsgSenderConfig.java
index d05f4c821e..2d5567324e 100644
---
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/sender/http/HttpMsgSenderConfig.java
+++
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/sender/http/HttpMsgSenderConfig.java
@@ -56,8 +56,6 @@ public class HttpMsgSenderConfig extends ProxyClientConfig
implements Cloneable
private long httpCloseWaitPeriodMs =
SdkConsts.VAL_DEF_HTTP_SDK_CLOSE_WAIT_MS;
// node reuse freezing time
private long httpNodeReuseWaitIfFailMs =
SdkConsts.VAL_DEF_HTTP_REUSE_FAIL_WAIT_MS;
- // message cache size for async report data.
- private int httpAsyncRptCacheSize =
SdkConsts.VAL_DEF_HTTP_ASYNC_RPT_CACHE_SIZE;
// thread number for async report data.
private int httpAsyncRptWorkerNum =
SdkConsts.VAL_DEF_HTTP_ASYNC_RPT_WORKER_NUM;
// interval for async worker in microseconds.
@@ -66,21 +64,25 @@ public class HttpMsgSenderConfig extends ProxyClientConfig
implements Cloneable
public HttpMsgSenderConfig(boolean visitMgrByHttps,
String managerIP, int managerPort, String groupId) throws
ProxySdkException {
super(visitMgrByHttps, managerIP, managerPort, groupId,
ReportProtocol.HTTP, null);
+ this.setMaxInFlightReqCnt(SdkConsts.VAL_DEF_HTTP_ASYNC_RPT_CACHE_CNT);
}
public HttpMsgSenderConfig(String managerAddress, String groupId) throws
ProxySdkException {
super(managerAddress, groupId, ReportProtocol.HTTP, null);
+ this.setMaxInFlightReqCnt(SdkConsts.VAL_DEF_HTTP_ASYNC_RPT_CACHE_CNT);
}
public HttpMsgSenderConfig(boolean visitMgrByHttps, String managerIP, int
managerPort,
String groupId, String mgrAuthSecretId, String mgrAuthSecretKey)
throws ProxySdkException {
super(visitMgrByHttps, managerIP, managerPort, groupId,
ReportProtocol.HTTP, null);
+ this.setMaxInFlightReqCnt(SdkConsts.VAL_DEF_HTTP_ASYNC_RPT_CACHE_CNT);
this.setMgrAuthzInfo(true, mgrAuthSecretId, mgrAuthSecretKey);
}
public HttpMsgSenderConfig(String managerAddress,
String groupId, String mgrAuthSecretId, String mgrAuthSecretKey)
throws ProxySdkException {
super(managerAddress, groupId, ReportProtocol.HTTP, null);
+ this.setMaxInFlightReqCnt(SdkConsts.VAL_DEF_HTTP_ASYNC_RPT_CACHE_CNT);
this.setMgrAuthzInfo(true, mgrAuthSecretId, mgrAuthSecretKey);
}
@@ -171,17 +173,11 @@ public class HttpMsgSenderConfig extends
ProxyClientConfig implements Cloneable
Math.max(SdkConsts.VAL_MIN_HTTP_REUSE_FAIL_WAIT_MS,
httpNodeReuseWaitIfFailMs));
}
- public int getHttpAsyncRptCacheSize() {
- return httpAsyncRptCacheSize;
- }
-
public int getHttpAsyncRptWorkerNum() {
return httpAsyncRptWorkerNum;
}
- public void setHttpAsyncRptPoolConfig(int httpAsyncRptCacheSize, int
httpAsyncRptWorkerNum) {
- this.httpAsyncRptCacheSize =
- Math.max(SdkConsts.VAL_MIN_HTTP_ASYNC_RPT_CACHE_SIZE,
httpAsyncRptCacheSize);
+ public void setHttpAsyncRptWorkerNum(int httpAsyncRptWorkerNum) {
this.httpAsyncRptWorkerNum =
Math.max(SdkConsts.VAL_MIN_HTTP_ASYNC_RPT_WORKER_NUM,
httpAsyncRptWorkerNum);
}
@@ -212,7 +208,6 @@ public class HttpMsgSenderConfig extends ProxyClientConfig
implements Cloneable
&& discardHttpCacheWhenClosing ==
that.discardHttpCacheWhenClosing
&& httpCloseWaitPeriodMs == that.httpCloseWaitPeriodMs
&& httpNodeReuseWaitIfFailMs == that.httpNodeReuseWaitIfFailMs
- && httpAsyncRptCacheSize == that.httpAsyncRptCacheSize
&& httpAsyncRptWorkerNum == that.httpAsyncRptWorkerNum
&& httpAsyncWorkerIdleWaitMs == that.httpAsyncWorkerIdleWaitMs
&& httpContentType == that.httpContentType
@@ -224,7 +219,7 @@ public class HttpMsgSenderConfig extends ProxyClientConfig
implements Cloneable
return Objects.hash(super.hashCode(), rptDataByHttps, httpContentType,
httpEventsSeparator, sepEventByLF, httpConTimeoutMs,
httpSocketTimeoutMs,
discardHttpCacheWhenClosing, httpCloseWaitPeriodMs,
httpNodeReuseWaitIfFailMs,
- httpAsyncRptCacheSize, httpAsyncRptWorkerNum,
httpAsyncWorkerIdleWaitMs);
+ httpAsyncRptWorkerNum, httpAsyncWorkerIdleWaitMs);
}
@Override
@@ -253,7 +248,6 @@ public class HttpMsgSenderConfig extends ProxyClientConfig
implements Cloneable
.append(",
discardHttpCacheWhenClosing=").append(discardHttpCacheWhenClosing)
.append(",
httpCloseWaitPeriodMs=").append(httpCloseWaitPeriodMs)
.append(",
httpNodeReuseWaitIfFailMs=").append(httpNodeReuseWaitIfFailMs)
- .append(",
httpAsyncRptCacheSize=").append(httpAsyncRptCacheSize)
.append(",
httpAsyncRptWorkerNum=").append(httpAsyncRptWorkerNum)
.append(",
httpAsyncWorkerIdleWaitMs=").append(httpAsyncWorkerIdleWaitMs);
return super.getSetting(strBuff);
diff --git
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/sender/http/InLongHttpMsgSender.java
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/sender/http/InLongHttpMsgSender.java
index b764e40b52..be4fd033c7 100644
---
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/sender/http/InLongHttpMsgSender.java
+++
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/sender/http/InLongHttpMsgSender.java
@@ -82,10 +82,15 @@ public class InLongHttpMsgSender extends BaseSender
implements HttpMsgSender {
if (!this.isStarted()) {
return procResult.setFailResult(ErrorCode.SDK_CLOSED);
}
+ boolean gotPermits = false;
try {
if (this.isMetaInfoUnReady()) {
return procResult.setFailResult(ErrorCode.NO_NODE_META_INFOS);
}
+ if (!tryAcquireCachePermits(eventInfo.getBodySize(), procResult)) {
+ return false;
+ }
+ gotPermits = true;
// check package length
if (!isValidPkgLength(eventInfo, this.getAllowedPkgLength(),
procResult)) {
return false;
@@ -96,6 +101,9 @@ public class InLongHttpMsgSender extends BaseSender
implements HttpMsgSender {
metricHolder.addAsyncHttpSucPutMetric(
eventInfo.getGroupId(), eventInfo.getStreamId(),
eventInfo.getMsgCnt());
} else {
+ if (gotPermits) {
+ releaseCachePermits(eventInfo.getBodySize());
+ }
metricHolder.addAsyncHttpFailPutMetric(procResult.getErrCode(),
eventInfo.getGroupId(), eventInfo.getStreamId(),
eventInfo.getMsgCnt());
}
diff --git
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/sender/tcp/InLongTcpMsgSender.java
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/sender/tcp/InLongTcpMsgSender.java
index 8add9f15f3..ea271cb8ec 100644
---
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/sender/tcp/InLongTcpMsgSender.java
+++
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/sender/tcp/InLongTcpMsgSender.java
@@ -79,10 +79,18 @@ public class InLongTcpMsgSender extends BaseSender
implements TcpMsgSender {
if (!this.isStarted()) {
return procResult.setFailResult(ErrorCode.SDK_CLOSED);
}
+ boolean gotPermits = false;
long curTime = System.currentTimeMillis();
try {
+ if (!tryAcquireCachePermits(eventInfo.getBodySize(), procResult)) {
+ return false;
+ }
+ gotPermits = true;
return processEvent(SendQos.SOURCE_ACK, eventInfo, null,
procResult);
} finally {
+ if (gotPermits) {
+ releaseCachePermits(eventInfo.getBodySize());
+ }
if (procResult.isSuccess()) {
metricHolder.addSyncSucMetric(eventInfo.getGroupId(),
eventInfo.getStreamId(),
eventInfo.getMsgCnt(), (System.currentTimeMillis() -
curTime));
@@ -100,13 +108,21 @@ public class InLongTcpMsgSender extends BaseSender
implements TcpMsgSender {
if (!this.isStarted()) {
return procResult.setFailResult(ErrorCode.SDK_CLOSED);
}
+ boolean gotPermits = false;
try {
+ if (!tryAcquireCachePermits(eventInfo.getBodySize(), procResult)) {
+ return false;
+ }
+ gotPermits = true;
return processEvent(SendQos.SOURCE_ACK, eventInfo, callback,
procResult);
} finally {
if (procResult.isSuccess()) {
metricHolder.addAsyncSucReqMetric(
eventInfo.getGroupId(), eventInfo.getStreamId(),
eventInfo.getMsgCnt());
} else {
+ if (gotPermits) {
+ releaseCachePermits(eventInfo.getBodySize());
+ }
metricHolder.addAsyncFailReqMetric(procResult.getErrCode(),
eventInfo.getGroupId(), eventInfo.getStreamId(),
eventInfo.getMsgCnt());
}
@@ -139,10 +155,18 @@ public class InLongTcpMsgSender extends BaseSender
implements TcpMsgSender {
if (!this.isStarted()) {
return procResult.setFailResult(ErrorCode.SDK_CLOSED);
}
+ boolean gotPermits = false;
long curTime = System.currentTimeMillis();
try {
+ if (!tryAcquireCachePermits(eventInfo.getBodySize(), procResult)) {
+ return false;
+ }
+ gotPermits = true;
return processEvent(SendQos.SINK_ACK, eventInfo, null, procResult);
} finally {
+ if (gotPermits) {
+ releaseCachePermits(eventInfo.getBodySize());
+ }
if (procResult.isSuccess()) {
metricHolder.addSyncSucMetric(eventInfo.getGroupId(),
eventInfo.getStreamId(),
eventInfo.getMsgCnt(), (System.currentTimeMillis() -
curTime));
@@ -160,13 +184,21 @@ public class InLongTcpMsgSender extends BaseSender
implements TcpMsgSender {
if (!this.isStarted()) {
return procResult.setFailResult(ErrorCode.SDK_CLOSED);
}
+ boolean gotPermits = false;
try {
+ if (!tryAcquireCachePermits(eventInfo.getBodySize(), procResult)) {
+ return false;
+ }
+ gotPermits = true;
return processEvent(SendQos.SINK_ACK, eventInfo, callback,
procResult);
} finally {
if (procResult.isSuccess()) {
metricHolder.addAsyncSucReqMetric(
eventInfo.getGroupId(), eventInfo.getStreamId(),
eventInfo.getMsgCnt());
} else {
+ if (gotPermits) {
+ releaseCachePermits(eventInfo.getBodySize());
+ }
metricHolder.addAsyncFailReqMetric(procResult.getErrCode(),
eventInfo.getGroupId(), eventInfo.getStreamId(),
eventInfo.getMsgCnt());
}
@@ -188,8 +220,9 @@ public class InLongTcpMsgSender extends BaseSender
implements TcpMsgSender {
if (this.isMetaInfoUnReady()) {
return procResult.setFailResult(ErrorCode.NO_NODE_META_INFOS);
}
- EncodeObject encObject = new EncodeObject(eventInfo.getGroupId(),
- eventInfo.getStreamId(), tcpConfig.getSdkMsgType(),
eventInfo.getDtMs());
+ EncodeObject encObject =
+ new EncodeObject(eventInfo.getGroupId(),
eventInfo.getStreamId(),
+ tcpConfig.getSdkMsgType(), eventInfo.getDtMs(),
eventInfo.getBodySize());
// pre-process attributes
processEventAttrsInfo(sendQos, eventInfo, encObject);
// check package length
diff --git
a/inlong-sdk/dataproxy-sdk/src/test/java/org/apache/inlong/sdk/dataproxy/PkgCacheQuotaTest.java
b/inlong-sdk/dataproxy-sdk/src/test/java/org/apache/inlong/sdk/dataproxy/PkgCacheQuotaTest.java
new file mode 100644
index 0000000000..6803e70cd1
--- /dev/null
+++
b/inlong-sdk/dataproxy-sdk/src/test/java/org/apache/inlong/sdk/dataproxy/PkgCacheQuotaTest.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sdk.dataproxy;
+
+import org.apache.inlong.sdk.dataproxy.common.ProcessResult;
+import org.apache.inlong.sdk.dataproxy.common.SdkConsts;
+import org.apache.inlong.sdk.dataproxy.network.PkgCacheQuota;
+import org.apache.inlong.sdk.dataproxy.utils.Tuple2;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+public class PkgCacheQuotaTest {
+
+ @Test
+ public void testPkgCacheQuota() throws Exception {
+ ProcessResult procResult = new ProcessResult();
+ PkgCacheQuota quota1 =
+ new PkgCacheQuota(false, "q1", -1, -1, 20);
+ Assert.assertTrue(quota1.tryAcquire(30, procResult));
+ Assert.assertTrue(quota1.tryAcquire(1000000, procResult));
+ quota1.release(30);
+ Tuple2<Integer, Integer> result = quota1.getPkgCacheAvailQuota();
+ Assert.assertEquals(result.getF0().intValue(),
SdkConsts.UNDEFINED_VALUE);
+ Assert.assertEquals(result.getF1().intValue(),
SdkConsts.UNDEFINED_VALUE);
+
+ PkgCacheQuota quota2 = new PkgCacheQuota(true, "q2", 3, -1, 20);
+ Assert.assertTrue(quota2.tryAcquire(30, procResult));
+ Assert.assertTrue(quota2.tryAcquire(50, procResult));
+ Assert.assertTrue(quota2.tryAcquire(40, procResult));
+ Assert.assertFalse(quota2.tryAcquire(50, procResult));
+ result = quota2.getPkgCacheAvailQuota();
+ Assert.assertNotNull(result);
+ Assert.assertNotNull(result.getF0());
+ Assert.assertEquals(result.getF1().intValue(),
SdkConsts.UNDEFINED_VALUE);
+ Assert.assertEquals(result.getF0().intValue(), 0);
+ quota2.release(50);
+ result = quota2.getPkgCacheAvailQuota();
+ Assert.assertEquals(result.getF0().intValue(), 1);
+
+ PkgCacheQuota quota3 = new PkgCacheQuota(false, "q3", -1, 5, 1014);
+ Assert.assertTrue(quota3.tryAcquire(10, procResult));
+ Assert.assertTrue(quota3.tryAcquire(20, procResult));
+ Assert.assertTrue(quota3.tryAcquire(30, procResult));
+ Assert.assertFalse(quota3.tryAcquire(40000, procResult));
+ Assert.assertFalse(quota3.tryAcquire(50, procResult));
+ result = quota3.getPkgCacheAvailQuota();
+ Assert.assertNotNull(result);
+ Assert.assertEquals(result.getF0().intValue(),
SdkConsts.UNDEFINED_VALUE);
+ Assert.assertEquals(result.getF1().intValue(), 0);
+ quota3.release(50);
+ result = quota3.getPkgCacheAvailQuota();
+ Assert.assertNotNull(result);
+ Assert.assertEquals(result.getF0().intValue(),
SdkConsts.UNDEFINED_VALUE);
+ Assert.assertEquals(result.getF1().intValue(), 2);
+
+ PkgCacheQuota quota4 = new PkgCacheQuota(true, "pa5", 5, 5, 1014);
+ Assert.assertTrue(quota4.tryAcquire(10, procResult));
+ Assert.assertTrue(quota4.tryAcquire(20, procResult));
+ Assert.assertTrue(quota4.tryAcquire(30, procResult));
+ Assert.assertFalse(quota4.tryAcquire(40000, procResult));
+ Assert.assertFalse(quota4.tryAcquire(50, procResult));
+ result = quota4.getPkgCacheAvailQuota();
+ Assert.assertNotNull(result);
+ Assert.assertEquals(result.getF0().intValue(), 2);
+ Assert.assertEquals(result.getF1().intValue(), 0);
+ quota4.release(50);
+ result = quota4.getPkgCacheAvailQuota();
+ Assert.assertNotNull(result);
+ Assert.assertEquals(result.getF0().intValue(), 3);
+ Assert.assertEquals(result.getF1().intValue(), 2);
+ }
+}
diff --git
a/inlong-sdk/dataproxy-sdk/src/test/java/org/apache/inlong/sdk/dataproxy/ProxyClientConfigTest.java
b/inlong-sdk/dataproxy-sdk/src/test/java/org/apache/inlong/sdk/dataproxy/ProxyClientConfigTest.java
index 68a08a00b9..b314f8a6e4 100644
---
a/inlong-sdk/dataproxy-sdk/src/test/java/org/apache/inlong/sdk/dataproxy/ProxyClientConfigTest.java
+++
b/inlong-sdk/dataproxy-sdk/src/test/java/org/apache/inlong/sdk/dataproxy/ProxyClientConfigTest.java
@@ -29,13 +29,15 @@ public class ProxyClientConfigTest {
public void testManagerConfig() throws Exception {
HttpMsgSenderConfig httpConfig = new HttpMsgSenderConfig(
"http://127.0.0.1:800", "test_id", "secretId", "secretKey");
- httpConfig.setHttpAsyncRptPoolConfig(30, 20);
+ httpConfig.setMaxInFlightReqCnt(30);
+ httpConfig.setHttpAsyncRptWorkerNum(20);
HttpMsgSenderConfig httpConfig1 = httpConfig.clone();
Assert.assertEquals(httpConfig, httpConfig1);
httpConfig1.setRegionName("sz");
- httpConfig1.setHttpAsyncRptPoolConfig(50, 10);
+ httpConfig.setMaxInFlightReqCnt(50);
+ httpConfig.setHttpAsyncRptWorkerNum(10);
Assert.assertNotEquals(httpConfig1.getRegionName(),
httpConfig.getRegionName());
- Assert.assertNotEquals(httpConfig1.getHttpAsyncRptCacheSize(),
httpConfig.getHttpAsyncRptCacheSize());
+ Assert.assertNotEquals(httpConfig1.getMaxInFlightReqCnt(),
httpConfig.getMaxInFlightReqCnt());
Assert.assertNotEquals(httpConfig1.getHttpAsyncRptWorkerNum(),
httpConfig.getHttpAsyncRptWorkerNum());
httpConfig.setRptDataByHttps(true);
httpConfig.setMetaCacheExpiredMs(30000);