This is an automated email from the ASF dual-hosted git repository.

gosonzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new dd9328c275 [INLONG-11754][SDK] Add the total number of in-flight 
requests and total size limits (#11755)
dd9328c275 is described below

commit dd9328c275b2342cfece3f1ed0939ce43b2b71cb
Author: Goson Zhang <[email protected]>
AuthorDate: Thu Feb 13 13:00:46 2025 +0800

    [INLONG-11754][SDK] Add the total number of in-flight requests and total 
size limits (#11755)
    
    
    
    ---------
    
    Co-authored-by: gosonzhang <[email protected]>
---
 .../apache/inlong/agent/core/HeartbeatManager.java |   3 +-
 .../plugin/sinks/filecollect/SenderManager.java    |   2 +-
 .../inlong/sdk/dataproxy/BaseMsgSenderFactory.java |  15 +-
 .../inlong/sdk/dataproxy/MsgSenderFactory.java     |   8 +
 .../sdk/dataproxy/MsgSenderMultiFactory.java       |  14 +-
 .../sdk/dataproxy/MsgSenderSingleFactory.java      |  14 +-
 .../inlong/sdk/dataproxy/common/ErrorCode.java     |  14 +-
 .../sdk/dataproxy/common/ProxyClientConfig.java    |  49 +++++-
 .../inlong/sdk/dataproxy/common/SdkConsts.java     |  13 +-
 .../sdk/dataproxy/metric/MetricDataHolder.java     |  18 ++-
 .../sdk/dataproxy/network/PkgCacheQuota.java       | 171 +++++++++++++++++++++
 .../sdk/dataproxy/network/http/HttpClientMgr.java  |  18 +--
 .../sdk/dataproxy/network/tcp/TcpCallFuture.java   |   6 +
 .../sdk/dataproxy/network/tcp/TcpClientMgr.java    |  20 ++-
 .../sdk/dataproxy/network/tcp/TcpNettyClient.java  |   2 +-
 .../dataproxy/network/tcp/codec/EncodeObject.java  |   8 +-
 .../inlong/sdk/dataproxy/sender/BaseSender.java    |  74 +++++++++
 .../dataproxy/sender/http/HttpMsgSenderConfig.java |  18 +--
 .../dataproxy/sender/http/InLongHttpMsgSender.java |   8 +
 .../dataproxy/sender/tcp/InLongTcpMsgSender.java   |  37 ++++-
 .../inlong/sdk/dataproxy/PkgCacheQuotaTest.java    |  88 +++++++++++
 .../sdk/dataproxy/ProxyClientConfigTest.java       |   8 +-
 22 files changed, 552 insertions(+), 56 deletions(-)

diff --git 
a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/HeartbeatManager.java
 
b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/HeartbeatManager.java
index 50282be586..82cd5cb648 100644
--- 
a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/HeartbeatManager.java
+++ 
b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/HeartbeatManager.java
@@ -201,7 +201,8 @@ public class HeartbeatManager extends AbstractDaemon 
implements AbstractHeartbea
         try {
             proxyClientConfig = new TcpMsgSenderConfig(managerAddr,
                     INLONG_AGENT_SYSTEM, authSecretId, authSecretKey);
-            
proxyClientConfig.setSendBufferSize(CommonConstants.DEFAULT_PROXY_TOTAL_ASYNC_PROXY_SIZE);
+            proxyClientConfig.setMaxInFlightSizeInKb(
+                    CommonConstants.DEFAULT_PROXY_TOTAL_ASYNC_PROXY_SIZE / 
1024);
             
proxyClientConfig.setAliveConnections(CommonConstants.DEFAULT_PROXY_ALIVE_CONNECTION_NUM);
             
proxyClientConfig.setNettyWorkerThreadNum(CommonConstants.DEFAULT_PROXY_CLIENT_IO_THREAD_NUM);
             proxyClientConfig.setRequestTimeoutMs(30000L);
diff --git 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/filecollect/SenderManager.java
 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/filecollect/SenderManager.java
index 34c1b67f70..76593b7512 100755
--- 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/filecollect/SenderManager.java
+++ 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/filecollect/SenderManager.java
@@ -203,7 +203,7 @@ public class SenderManager {
     private void createMessageSender() throws Exception {
         TcpMsgSenderConfig proxyClientConfig = new TcpMsgSenderConfig(
                 managerAddr, inlongGroupId, authSecretId, authSecretKey);
-        proxyClientConfig.setSendBufferSize(totalAsyncBufSize);
+        proxyClientConfig.setMaxInFlightSizeInKb(totalAsyncBufSize / 1024);
         proxyClientConfig.setAliveConnections(aliveConnectionNum);
         proxyClientConfig.setRequestTimeoutMs(maxSenderTimeout * 1000L);
         proxyClientConfig.setNettyWorkerThreadNum(ioThreadNum);
diff --git 
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/BaseMsgSenderFactory.java
 
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/BaseMsgSenderFactory.java
index 623a318188..ce276b9cf6 100644
--- 
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/BaseMsgSenderFactory.java
+++ 
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/BaseMsgSenderFactory.java
@@ -19,9 +19,11 @@ package org.apache.inlong.sdk.dataproxy;
 
 import org.apache.inlong.sdk.dataproxy.common.ProcessResult;
 import org.apache.inlong.sdk.dataproxy.common.ProxyClientConfig;
+import org.apache.inlong.sdk.dataproxy.common.SdkConsts;
 import org.apache.inlong.sdk.dataproxy.config.ProxyConfigEntry;
 import org.apache.inlong.sdk.dataproxy.config.ProxyConfigManager;
 import org.apache.inlong.sdk.dataproxy.exception.ProxySdkException;
+import org.apache.inlong.sdk.dataproxy.network.PkgCacheQuota;
 import org.apache.inlong.sdk.dataproxy.sender.BaseSender;
 import org.apache.inlong.sdk.dataproxy.sender.http.HttpMsgSenderConfig;
 import org.apache.inlong.sdk.dataproxy.sender.http.InLongHttpMsgSender;
@@ -52,6 +54,7 @@ public class BaseMsgSenderFactory {
     // msg send factory
     private final MsgSenderFactory msgSenderFactory;
     private final String factoryNo;
+    private final PkgCacheQuota pkgCacheQuota;
     // for senders
     private final ReentrantReadWriteLock senderCacheLock = new 
ReentrantReadWriteLock();
     // for inlong groupId -- Sender map
@@ -59,10 +62,14 @@ public class BaseMsgSenderFactory {
     // for inlong clusterId -- Sender map
     private final ConcurrentHashMap<String, BaseSender> clusterIdSenderMap = 
new ConcurrentHashMap<>();
 
-    public BaseMsgSenderFactory(MsgSenderFactory msgSenderFactory, String 
factoryNo) {
+    protected BaseMsgSenderFactory(MsgSenderFactory msgSenderFactory,
+            String factoryNo, int factoryPkgCntPermits, int 
factoryPkgSizeKbPermits) {
         this.msgSenderFactory = msgSenderFactory;
         this.factoryNo = factoryNo;
-        logger.info("MsgSenderFactory({}) started", this.factoryNo);
+        this.pkgCacheQuota = new PkgCacheQuota(true, factoryNo,
+                factoryPkgCntPermits, factoryPkgSizeKbPermits, 
SdkConsts.VAL_DEF_PADDING_SIZE);
+        logger.info("MsgSenderFactory({}) started, factoryPkgCntPermits={}, 
factoryPkgSizeKbPermits={}",
+                this.factoryNo, factoryPkgCntPermits, factoryPkgSizeKbPermits);
     }
 
     public void close() {
@@ -105,6 +112,10 @@ public class BaseMsgSenderFactory {
         }
     }
 
+    public PkgCacheQuota getPkgCacheQuota() {
+        return pkgCacheQuota;
+    }
+
     public int getMsgSenderCount() {
         return groupIdSenderMap.size() + clusterIdSenderMap.size();
     }
diff --git 
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/MsgSenderFactory.java
 
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/MsgSenderFactory.java
index 7fbae90964..3d7b8fb4a5 100644
--- 
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/MsgSenderFactory.java
+++ 
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/MsgSenderFactory.java
@@ -18,6 +18,7 @@
 package org.apache.inlong.sdk.dataproxy;
 
 import org.apache.inlong.sdk.dataproxy.exception.ProxySdkException;
+import org.apache.inlong.sdk.dataproxy.network.PkgCacheQuota;
 import org.apache.inlong.sdk.dataproxy.sender.BaseSender;
 import org.apache.inlong.sdk.dataproxy.sender.http.HttpMsgSenderConfig;
 import org.apache.inlong.sdk.dataproxy.sender.http.InLongHttpMsgSender;
@@ -53,6 +54,13 @@ public interface MsgSenderFactory {
      */
     int getMsgSenderCount();
 
+    /**
+     * Get factory level inflight request quota
+     *
+     * @return the factory level inflight request quota
+     */
+    PkgCacheQuota getFactoryPkgCacheQuota();
+
     /**
      * Get or generate a sender from the factory according to groupId
      *
diff --git 
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/MsgSenderMultiFactory.java
 
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/MsgSenderMultiFactory.java
index 559b7d7bac..7d4f9d5110 100644
--- 
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/MsgSenderMultiFactory.java
+++ 
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/MsgSenderMultiFactory.java
@@ -17,7 +17,9 @@
 
 package org.apache.inlong.sdk.dataproxy;
 
+import org.apache.inlong.sdk.dataproxy.common.SdkConsts;
 import org.apache.inlong.sdk.dataproxy.exception.ProxySdkException;
+import org.apache.inlong.sdk.dataproxy.network.PkgCacheQuota;
 import org.apache.inlong.sdk.dataproxy.sender.BaseSender;
 import org.apache.inlong.sdk.dataproxy.sender.http.HttpMsgSenderConfig;
 import org.apache.inlong.sdk.dataproxy.sender.http.InLongHttpMsgSender;
@@ -41,8 +43,13 @@ public class MsgSenderMultiFactory implements 
MsgSenderFactory {
     private final BaseMsgSenderFactory baseMsgSenderFactory;
 
     public MsgSenderMultiFactory() {
+        this(SdkConsts.UNDEFINED_VALUE, SdkConsts.UNDEFINED_VALUE);
+    }
+
+    public MsgSenderMultiFactory(int factoryPkgCntPermits, int 
factoryPkgSizeKbPermits) {
         this.baseMsgSenderFactory = new BaseMsgSenderFactory(this,
-                "iMultiFact-" + ProxyUtils.getProcessPid() + "-" + 
refCounter.incrementAndGet());
+                "iMultiFact-" + ProxyUtils.getProcessPid() + "-" + 
refCounter.incrementAndGet(),
+                factoryPkgCntPermits, factoryPkgSizeKbPermits);
         this.initialized.set(true);
     }
 
@@ -69,6 +76,11 @@ public class MsgSenderMultiFactory implements 
MsgSenderFactory {
         return this.baseMsgSenderFactory.getMsgSenderCount();
     }
 
+    @Override
+    public PkgCacheQuota getFactoryPkgCacheQuota() {
+        return baseMsgSenderFactory.getPkgCacheQuota();
+    }
+
     @Override
     public InLongTcpMsgSender genTcpSenderByGroupId(
             TcpMsgSenderConfig configure) throws ProxySdkException {
diff --git 
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/MsgSenderSingleFactory.java
 
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/MsgSenderSingleFactory.java
index 4e99c6550b..4d795101c2 100644
--- 
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/MsgSenderSingleFactory.java
+++ 
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/MsgSenderSingleFactory.java
@@ -17,7 +17,9 @@
 
 package org.apache.inlong.sdk.dataproxy;
 
+import org.apache.inlong.sdk.dataproxy.common.SdkConsts;
 import org.apache.inlong.sdk.dataproxy.exception.ProxySdkException;
+import org.apache.inlong.sdk.dataproxy.network.PkgCacheQuota;
 import org.apache.inlong.sdk.dataproxy.sender.BaseSender;
 import org.apache.inlong.sdk.dataproxy.sender.http.HttpMsgSenderConfig;
 import org.apache.inlong.sdk.dataproxy.sender.http.InLongHttpMsgSender;
@@ -42,10 +44,15 @@ public class MsgSenderSingleFactory implements 
MsgSenderFactory {
     private static BaseMsgSenderFactory baseMsgSenderFactory;
 
     public MsgSenderSingleFactory() {
+        this(SdkConsts.UNDEFINED_VALUE, SdkConsts.UNDEFINED_VALUE);
+    }
+
+    public MsgSenderSingleFactory(int factoryPkgCntPermits, int 
factoryPkgSizeKbPermits) {
         synchronized (singletonRefCounter) {
             if (singletonRefCounter.incrementAndGet() == 1) {
                 baseMsgSenderFactory = new BaseMsgSenderFactory(this,
-                        "iSingleFct-" + ProxyUtils.getProcessPid() + "-" + 
refCounter.incrementAndGet());
+                        "iSingleFct-" + ProxyUtils.getProcessPid() + "-" + 
refCounter.incrementAndGet(),
+                        factoryPkgCntPermits, factoryPkgSizeKbPermits);
                 initialized.set(true);
             }
         }
@@ -86,6 +93,11 @@ public class MsgSenderSingleFactory implements 
MsgSenderFactory {
         return baseMsgSenderFactory.getMsgSenderCount();
     }
 
+    @Override
+    public PkgCacheQuota getFactoryPkgCacheQuota() {
+        return baseMsgSenderFactory.getPkgCacheQuota();
+    }
+
     @Override
     public InLongTcpMsgSender genTcpSenderByGroupId(
             TcpMsgSenderConfig configure) throws ProxySdkException {
diff --git 
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/common/ErrorCode.java
 
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/common/ErrorCode.java
index b1dd6f26c3..379d97c4f4 100644
--- 
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/common/ErrorCode.java
+++ 
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/common/ErrorCode.java
@@ -63,6 +63,13 @@ public enum ErrorCode {
     //
     FETCH_PROXY_META_FAILURE(59, "Fetch dataproxy meta info failure"),
     FETCH_ENCRYPT_META_FAILURE(60, "Fetch encrypt meta info failure"),
+
+    //
+    INF_REQ_COUNT_REACH_FACTORY_LIMIT(71, "In-flight Request count reach 
factory limit"),
+    INF_REQ_SIZE_REACH_FACTORY_LIMIT(72, "In-flight Request size reach factory 
limit"),
+    INF_REQ_COUNT_REACH_SDK_LIMIT(73, "In-flight Request count reach sdk 
limit"),
+    INF_REQ_SIZE_REACH_SDK_LIMIT(74, "In-flight Request size reach sdk limit"),
+
     //
     NO_NODE_META_INFOS(81, "No proxy node metadata info in local"),
     EMPTY_ACTIVE_NODE_SET(82, "Empty active node set"),
@@ -94,10 +101,9 @@ public enum ErrorCode {
     //
     DP_RECEIVE_FAILURE(160, "DataProxy return message receive failure"),
     //
-    HTTP_ASYNC_POOL_FULL(171, "Http async pool full"),
-    HTTP_ASYNC_OFFER_FAIL(172, "Http async offer event fail"),
-    HTTP_ASYNC_OFFER_EXCEPTION(173, "Http async offer event exception"),
-    HTTP_BUILD_CLIENT_EXCEPTION(174, "Http build client exception"),
+    HTTP_ASYNC_OFFER_FAIL(171, "Http async offer event fail"),
+    HTTP_ASYNC_OFFER_EXCEPTION(172, "Http async offer event exception"),
+    HTTP_BUILD_CLIENT_EXCEPTION(173, "Http build client exception"),
     //
     BUILD_FORM_CONTENT_EXCEPTION(181, "Build form content exception"),
     DP_RETURN_FAILURE(182, "DataProxy return failure"),
diff --git 
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/common/ProxyClientConfig.java
 
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/common/ProxyClientConfig.java
index 4079eaf81e..1065bbf259 100644
--- 
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/common/ProxyClientConfig.java
+++ 
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/common/ProxyClientConfig.java
@@ -85,6 +85,11 @@ public class ProxyClientConfig implements Cloneable {
     private int aliveConnections = SdkConsts.VAL_DEF_ALIVE_CONNECTIONS;
     // node forced selection interval ms
     private long forceReChooseInrMs = SdkConsts.VAL_DEF_FORCE_CHOOSE_INR_MS;
+    // max inflight request count and size per SDK
+    private int maxInFlightReqCnt = SdkConsts.UNDEFINED_VALUE;
+    private int maxInFlightSizeKb = SdkConsts.UNDEFINED_VALUE;
+    private int paddingSize = SdkConsts.VAL_DEF_PADDING_SIZE;
+
     // metric setting
     private final MetricConfig metricConfig = new MetricConfig();
     // report setting
@@ -304,7 +309,6 @@ public class ProxyClientConfig implements Cloneable {
     public void setMetaSyncWaitMsIfFail(int metaSyncWaitMsIfFail) {
         this.metaSyncWaitMsIfFail = 
Math.min(SdkConsts.VAL_MAX_WAIT_MS_IF_CONFIG_REQ_FAIL,
                 Math.max(SdkConsts.VAL_MIN_WAIT_MS_IF_CONFIG_REQ_FAIL, 
metaSyncWaitMsIfFail));
-
     }
 
     public String getMetaStoreBasePath() {
@@ -389,6 +393,40 @@ public class ProxyClientConfig implements Cloneable {
         this.metricConfig.setMetricKeyMaskInfos(maskGroupId, maskStreamId);
     }
 
+    public int getMaxInFlightReqCnt() {
+        return maxInFlightReqCnt;
+    }
+
+    public int getMaxInFlightSizeKb() {
+        return maxInFlightSizeKb;
+    }
+
+    public void setMaxInFlightReqCnt(int maxInFlightReqCnt) {
+        if (maxInFlightReqCnt > 0) {
+            this.maxInFlightReqCnt = maxInFlightReqCnt;
+        }
+    }
+
+    public void setMaxInFlightSizeInKb(int maxInFlightSizeInKb) {
+        if (maxInFlightSizeInKb > 0) {
+            this.maxInFlightSizeKb = maxInFlightSizeInKb;
+        }
+    }
+
+    public void setMaxInFlightPermitsPerSdk(int maxInFlightReqCnt, int 
maxInFlightSizeInKb) {
+        setMaxInFlightReqCnt(maxInFlightReqCnt);
+        setMaxInFlightSizeInKb(maxInFlightSizeInKb);
+    }
+
+    public int getPaddingSize() {
+        return paddingSize;
+    }
+
+    public void setPaddingSize(int paddingSize) {
+        this.paddingSize = Math.min(SdkConsts.VAL_MAX_PADDING_SIZE,
+                Math.max(SdkConsts.VAL_MIN_PADDING_SIZE, paddingSize));
+    }
+
     public MetricConfig getMetricConfig() {
         return metricConfig;
     }
@@ -413,6 +451,9 @@ public class ProxyClientConfig implements Cloneable {
                 && aliveConnections == that.aliveConnections
                 && forceReChooseInrMs == that.forceReChooseInrMs
                 && enableReportAuthz == that.enableReportAuthz
+                && maxInFlightReqCnt == that.maxInFlightReqCnt
+                && maxInFlightSizeKb == that.maxInFlightSizeKb
+                && paddingSize == that.paddingSize
                 && enableReportEncrypt == that.enableReportEncrypt
                 && Objects.equals(tlsVersion, that.tlsVersion)
                 && Objects.equals(managerIP, that.managerIP)
@@ -439,7 +480,8 @@ public class ProxyClientConfig implements Cloneable {
                 mgrMetaSyncInrMs, metaSyncMaxRetryIfFail, metaStoreBasePath,
                 metaCacheExpiredMs, metaQryFailCacheExpiredMs, 
aliveConnections,
                 forceReChooseInrMs, metricConfig, enableReportAuthz, 
enableReportEncrypt,
-                rptRsaPubKeyUrl, rptUserName, rptSecretKey);
+                rptRsaPubKeyUrl, rptUserName, rptSecretKey, maxInFlightReqCnt,
+                maxInFlightSizeKb, paddingSize);
     }
 
     @Override
@@ -476,6 +518,9 @@ public class ProxyClientConfig implements Cloneable {
                 .append(", forceReChooseInrMs=").append(forceReChooseInrMs)
                 .append(", enableReportAuthz=").append(enableReportAuthz)
                 .append(", enableReportEncrypt=").append(enableReportEncrypt)
+                .append(", maxInFlightReqCnt=").append(maxInFlightReqCnt)
+                .append(", maxInFlightSizeKb=").append(maxInFlightSizeKb)
+                .append(", paddingSize=").append(paddingSize)
                 .append(", rptRsaPubKeyUrl='").append(rptRsaPubKeyUrl)
                 .append("', rptUserName='").append(rptUserName)
                 .append("', rptSecretKey='").append(rptSecretKey)
diff --git 
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/common/SdkConsts.java
 
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/common/SdkConsts.java
index 7979851d3e..ae4d5e64dd 100644
--- 
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/common/SdkConsts.java
+++ 
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/common/SdkConsts.java
@@ -36,6 +36,8 @@ public class SdkConsts {
     public static final String VAL_DEF_REGION_NAME = "";
     // http report method
     public static final String DATAPROXY_REPORT_METHOD = "/dataproxy/message";
+    // undefined value
+    public static final int UNDEFINED_VALUE = -1;
     // config info sync interval in minutes
     public static final int VAL_DEF_CONFIG_SYNC_INTERVAL_MIN = 3;
     public static final int VAL_MIN_CONFIG_SYNC_INTERVAL_MIN = 1;
@@ -110,9 +112,8 @@ public class SdkConsts {
     public static final long VAL_DEF_HTTP_REUSE_FAIL_WAIT_MS = 20000L;
     public static final long VAL_MAX_HTTP_REUSE_FAIL_WAIT_MS = 300000L;
     public static final long VAL_MIN_HTTP_REUSE_FAIL_WAIT_MS = 1000L;
-    // HTTP async report request cache size
-    public static final int VAL_DEF_HTTP_ASYNC_RPT_CACHE_SIZE = 2000;
-    public static final int VAL_MIN_HTTP_ASYNC_RPT_CACHE_SIZE = 1;
+    // HTTP async report request cache request count
+    public static final int VAL_DEF_HTTP_ASYNC_RPT_CACHE_CNT = 2000;
     // HTTP async report worker thread count
     public static final int VAL_DEF_HTTP_ASYNC_RPT_WORKER_NUM =
             Runtime.getRuntime().availableProcessors();
@@ -133,4 +134,10 @@ public class SdkConsts {
 
     /* Reserved attribute data size(bytes). */
     public static int RESERVED_ATTRIBUTE_LENGTH = 256;
+    // unit KB
+    public static final int UNIT_KB_SIZE = 1024;
+    // padding size per package
+    public static final int VAL_DEF_PADDING_SIZE = 200;
+    public static final int VAL_MIN_PADDING_SIZE = 0;
+    public static final int VAL_MAX_PADDING_SIZE = 4096;
 }
diff --git 
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/metric/MetricDataHolder.java
 
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/metric/MetricDataHolder.java
index 4b16f62c8d..627ab30dfb 100644
--- 
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/metric/MetricDataHolder.java
+++ 
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/metric/MetricDataHolder.java
@@ -22,8 +22,8 @@ import org.apache.inlong.sdk.dataproxy.sender.BaseSender;
 import org.apache.inlong.sdk.dataproxy.sender.http.HttpMsgSenderConfig;
 import org.apache.inlong.sdk.dataproxy.sender.tcp.TcpMsgSender;
 import org.apache.inlong.sdk.dataproxy.sender.tcp.TcpMsgSenderConfig;
-import org.apache.inlong.sdk.dataproxy.utils.LogCounter;
 import org.apache.inlong.sdk.dataproxy.utils.ProxyUtils;
+import org.apache.inlong.sdk.dataproxy.utils.Tuple2;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -40,7 +40,6 @@ public class MetricDataHolder implements Runnable {
 
     private static final String DEFAULT_KEY_SPLITTER = "#";
     private static final Logger logger = 
LoggerFactory.getLogger(MetricDataHolder.class);
-    private static final LogCounter exceptCnt = new LogCounter(5, 100000, 60 * 
1000L);
 
     private final MetricConfig metricConfig;
     private final BaseSender sender;
@@ -457,10 +456,22 @@ public class MetricDataHolder implements Runnable {
                 .append(",\"lrT\":").append(lstReportTime)
                 .append(",");
         metricUnit.getAndResetValue(strBuff);
+        Tuple2<Integer, Integer> factoryAvailQuota = 
sender.getFactoryAvailQuota();
+        Tuple2<Integer, Integer> senderAvailQuota = 
sender.getSenderAvailQuota();
         strBuff.append(",\"s\":{\"tNs\":").append(sender.getProxyNodeCnt())
                 .append(",\"aNs\":").append(sender.getActiveNodeCnt())
                 .append(",\"ifRs\":").append(sender.getInflightMsgCnt())
+                .append(",\"afPc\":").append(factoryAvailQuota.getF0())
+                .append(",\"afPs\":").append(factoryAvailQuota.getF1())
+                .append(",\"aPc\":").append(senderAvailQuota.getF0())
+                .append(",\"aPs\":").append(senderAvailQuota.getF1())
                 
.append("},\"c\":{\"aC\":").append(sender.getConfigure().getAliveConnections())
+                .append(",\"gBf\":").append(sender.isGenByFactory())
+                .append(",\"ifCc\":").append(sender.getFactoryPkgCntPermits())
+                .append(",\"ifCs\":").append(sender.getFactoryPkgCntPermits())
+                
.append(",\"iCc\":").append(sender.getConfigure().getMaxInFlightReqCnt())
+                
.append(",\"iCs\":").append(sender.getConfigure().getMaxInFlightSizeKb())
+                
.append(",\"iRp\":").append(sender.getConfigure().getPaddingSize())
                 
.append(",\"rP\":\"").append(sender.getConfigure().getDataRptProtocol())
                 
.append("\",\"rG\":\"").append(sender.getConfigure().getRegionName())
                 .append("\"");
@@ -481,8 +492,7 @@ public class MetricDataHolder implements Runnable {
             strBuff.append(",\"iHs\":").append(httpConfig.isRptDataByHttps())
                     
.append(",\"sOt\":").append(httpConfig.getHttpSocketTimeoutMs())
                     
.append(",\"cOt\":").append(httpConfig.getHttpConTimeoutMs())
-                    
.append(",\"aWk\":").append(httpConfig.getHttpAsyncRptWorkerNum())
-                    
.append(",\"aC\":").append(httpConfig.getHttpAsyncRptCacheSize());
+                    
.append(",\"aWk\":").append(httpConfig.getHttpAsyncRptWorkerNum());
         }
         String content = strBuff.append("}}").toString();
         strBuff.delete(0, strBuff.length());
diff --git 
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/PkgCacheQuota.java
 
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/PkgCacheQuota.java
new file mode 100644
index 0000000000..424ec41554
--- /dev/null
+++ 
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/PkgCacheQuota.java
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sdk.dataproxy.network;
+
+import org.apache.inlong.sdk.dataproxy.common.ErrorCode;
+import org.apache.inlong.sdk.dataproxy.common.ProcessResult;
+import org.apache.inlong.sdk.dataproxy.common.SdkConsts;
+import org.apache.inlong.sdk.dataproxy.utils.Tuple2;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.Semaphore;
+
+/**
+ * Package Cache Quota class
+ *
+ * Used to manage the total number and byte size of ongoing requests.
+ */
+public class PkgCacheQuota {
+
+    private static final Logger logger = 
LoggerFactory.getLogger(PkgCacheQuota.class);
+    public static final Tuple2<Integer, Integer> DISABLE_RET =
+            new Tuple2<>(SdkConsts.UNDEFINED_VALUE, SdkConsts.UNDEFINED_VALUE);
+    // whether factory level quota
+    private final boolean factoryLevel;
+    // pkg count permits
+    private final int pkgCntPermits;
+    // pkg size permits
+    private final int pkgSizeKbPermits;
+    // reserved size per package
+    private final int paddingSizePerPkg;
+    // whether disable package inflight quota function
+    private final boolean disabled;
+    // package count quota
+    private final Semaphore pkgCntQuota;
+    // package size quota in KB
+    private final Semaphore pkgSizeKbQuota;
+
+    public PkgCacheQuota(boolean factoryLevel, String parentId,
+            int pkgCntPermits, int pkgSizeKbPermits, int paddingSizePerPkg) {
+        this.factoryLevel = factoryLevel;
+        this.pkgCntPermits = pkgCntPermits;
+        this.pkgSizeKbPermits = pkgSizeKbPermits;
+        this.paddingSizePerPkg = paddingSizePerPkg;
+        if (pkgCntPermits > 0) {
+            this.pkgCntQuota = new Semaphore(pkgCntPermits);
+        } else {
+            this.pkgCntQuota = null;
+        }
+        if (pkgSizeKbPermits > 0) {
+            this.pkgSizeKbQuota = new Semaphore(pkgSizeKbPermits);
+        } else {
+            this.pkgSizeKbQuota = null;
+        }
+        this.disabled = (this.pkgCntQuota == null
+                && this.pkgSizeKbQuota == null);
+        logger.info("PkgCacheQuota({}) created, 
factoryLevel={},pkgCnt={},pkgSizeKb={},padSize={}",
+                factoryLevel, parentId, pkgCntPermits, pkgSizeKbPermits, 
paddingSizePerPkg);
+    }
+
+    public Tuple2<Integer, Integer> getPkgCacheAvailQuota() {
+        if (this.disabled) {
+            return DISABLE_RET;
+        }
+        int cntAvailable = SdkConsts.UNDEFINED_VALUE;
+        if (this.pkgCntQuota != null) {
+            cntAvailable = this.pkgCntQuota.availablePermits();
+        }
+        int sizeAvailable = SdkConsts.UNDEFINED_VALUE;
+        if (this.pkgSizeKbQuota != null) {
+            sizeAvailable = this.pkgSizeKbQuota.availablePermits();
+        }
+        return new Tuple2<>(cntAvailable, sizeAvailable);
+    }
+
+    public boolean tryAcquire(int sizeInByte, ProcessResult procResult) {
+        if (this.disabled) {
+            return procResult.setSuccess();
+        }
+        if (this.pkgCntQuota == null) {
+            if (this.pkgSizeKbQuota.tryAcquire(
+                    getSizeKbPermitsByBytes(sizeInByte))) {
+                return procResult.setSuccess();
+            }
+            return procResult.setFailResult(factoryLevel
+                    ? ErrorCode.INF_REQ_SIZE_REACH_FACTORY_LIMIT
+                    : ErrorCode.INF_REQ_SIZE_REACH_SDK_LIMIT);
+        } else {
+            if (this.pkgCntQuota.tryAcquire(1)) {
+                if (this.pkgSizeKbQuota == null) {
+                    return procResult.setSuccess();
+                } else {
+                    if (this.pkgSizeKbQuota.tryAcquire(
+                            getSizeKbPermitsByBytes(sizeInByte))) {
+                        return procResult.setSuccess();
+                    } else {
+                        this.pkgCntQuota.release();
+                        return procResult.setFailResult(factoryLevel
+                                ? ErrorCode.INF_REQ_SIZE_REACH_FACTORY_LIMIT
+                                : ErrorCode.INF_REQ_SIZE_REACH_SDK_LIMIT);
+                    }
+                }
+            } else {
+                return procResult.setFailResult(factoryLevel
+                        ? ErrorCode.INF_REQ_COUNT_REACH_FACTORY_LIMIT
+                        : ErrorCode.INF_REQ_COUNT_REACH_SDK_LIMIT);
+            }
+        }
+    }
+
+    public void release(int sizeInByte) {
+        if (this.disabled) {
+            return;
+        }
+        if (this.pkgCntQuota == null) {
+            this.pkgSizeKbQuota.release(
+                    getSizeKbPermitsByBytes(sizeInByte));
+        } else {
+            if (this.pkgSizeKbQuota != null) {
+                this.pkgSizeKbQuota.release(
+                        getSizeKbPermitsByBytes(sizeInByte));
+            }
+            this.pkgCntQuota.release();
+        }
+    }
+
+    public boolean isFactoryLevel() {
+        return factoryLevel;
+    }
+
+    public int getPkgCntPermits() {
+        return pkgCntPermits;
+    }
+
+    public int getPkgSizeKbPermits() {
+        return pkgSizeKbPermits;
+    }
+
+    public int getPaddingSizePerPkg() {
+        return paddingSizePerPkg;
+    }
+
+    public boolean isDisabled() {
+        return disabled;
+    }
+
+    private int getSizeKbPermitsByBytes(int sizeInByte) {
+        int tmpValue = sizeInByte + this.paddingSizePerPkg;
+        if (tmpValue % SdkConsts.UNIT_KB_SIZE == 0) {
+            return (tmpValue / SdkConsts.UNIT_KB_SIZE);
+        } else {
+            return (tmpValue / SdkConsts.UNIT_KB_SIZE) + 1;
+        }
+    }
+}
diff --git 
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/http/HttpClientMgr.java
 
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/http/HttpClientMgr.java
index 1700e394d5..cc325290a6 100644
--- 
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/http/HttpClientMgr.java
+++ 
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/http/HttpClientMgr.java
@@ -56,7 +56,6 @@ import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.Semaphore;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 
@@ -78,7 +77,6 @@ public class HttpClientMgr implements ClientMgr {
     private final HttpMsgSenderConfig httpConfig;
     private CloseableHttpClient httpClient;
     private final LinkedBlockingQueue<HttpAsyncObj> messageCache;
-    private final Semaphore asyncIdleCellCnt;
     private final ExecutorService workerServices = 
Executors.newCachedThreadPool();
     private volatile boolean existSend = false;
     private final AtomicBoolean shutDown = new AtomicBoolean(false);
@@ -94,8 +92,7 @@ public class HttpClientMgr implements ClientMgr {
     public HttpClientMgr(BaseSender sender, HttpMsgSenderConfig httpConfig) {
         this.sender = sender;
         this.httpConfig = httpConfig;
-        this.messageCache = new 
LinkedBlockingQueue<>(httpConfig.getHttpAsyncRptCacheSize());
-        this.asyncIdleCellCnt = new 
Semaphore(httpConfig.getHttpAsyncRptCacheSize(), true);
+        this.messageCache = new 
LinkedBlockingQueue<>(httpConfig.getMaxInFlightReqCnt());
     }
 
     @Override
@@ -162,7 +159,7 @@ public class HttpClientMgr implements ClientMgr {
                         logger.error("HttpAsync({}) callback event exception", 
this.sender.getSenderId(), ex);
                     }
                 } finally {
-                    asyncIdleCellCnt.release();
+                    
sender.releaseCachePermits(asyncObj.getHttpEvent().getBodySize());
                     if (isSucc) {
                         
sender.getMetricHolder().addCallbackSucMetric(asyncObj.getHttpEvent().getGroupId(),
                                 asyncObj.getHttpEvent().getStreamId(), 
asyncObj.getHttpEvent().getMsgCnt(),
@@ -282,21 +279,12 @@ public class HttpClientMgr implements ClientMgr {
         if (curNodes.isEmpty()) {
             return procResult.setFailResult(ErrorCode.EMPTY_ACTIVE_NODE_SET);
         }
-        if (!this.asyncIdleCellCnt.tryAcquire()) {
-            return procResult.setFailResult(ErrorCode.HTTP_ASYNC_POOL_FULL);
-        }
-        boolean released = false;
         try {
             if (!this.messageCache.offer(asyncObj)) {
-                this.asyncIdleCellCnt.release();
-                released = true;
                 return 
procResult.setFailResult(ErrorCode.HTTP_ASYNC_OFFER_FAIL);
             }
             return procResult.setSuccess();
         } catch (Throwable ex) {
-            if (!released) {
-                this.asyncIdleCellCnt.release();
-            }
             if (asyncSendExptCnt.shouldPrint()) {
                 logger.warn("ClientMgr({}) async offer event exception", 
this.sender.getSenderId(), ex);
             }
@@ -498,7 +486,7 @@ public class HttpClientMgr implements ClientMgr {
                             logger.error("HttpAsync({}) report event 
exception", workerId, ex);
                         }
                     } finally {
-                        asyncIdleCellCnt.release();
+                        
sender.releaseCachePermits(asyncObj.getHttpEvent().getBodySize());
                         if (procResult.isSuccess()) {
                             
sender.getMetricHolder().addCallbackSucMetric(asyncObj.getHttpEvent().getGroupId(),
                                     asyncObj.getHttpEvent().getStreamId(), 
asyncObj.getHttpEvent().getMsgCnt(),
diff --git 
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/tcp/TcpCallFuture.java
 
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/tcp/TcpCallFuture.java
index 90132018a7..1d6cfa800e 100644
--- 
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/tcp/TcpCallFuture.java
+++ 
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/tcp/TcpCallFuture.java
@@ -36,6 +36,7 @@ public class TcpCallFuture implements MsgSendCallback {
     private final String groupId;
     private final String streamId;
     private final int msgCnt;
+    private final int eventSize;
     private final long rtTime;
     private final String clientAddr;
     private final long chanTerm;
@@ -53,6 +54,7 @@ public class TcpCallFuture implements MsgSendCallback {
         this.streamId = encObject.getStreamId();
         this.rtTime = encObject.getRtms();
         this.msgCnt = encObject.getMsgCnt();
+        this.eventSize = encObject.getEventSize();
         this.clientAddr = clientAddr;
         this.chanTerm = chanTerm;
         this.chanStr = chanStr;
@@ -132,4 +134,8 @@ public class TcpCallFuture implements MsgSendCallback {
     public boolean isAsyncCall() {
         return isAsyncCall;
     }
+
+    public int getEventSize() {
+        return eventSize;
+    }
 }
diff --git 
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/tcp/TcpClientMgr.java
 
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/tcp/TcpClientMgr.java
index 0a9c74b616..0ad5e44e44 100644
--- 
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/tcp/TcpClientMgr.java
+++ 
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/tcp/TcpClientMgr.java
@@ -394,6 +394,7 @@ public class TcpClientMgr implements ClientMgr {
             }
         } finally {
             this.descInflightMsgCnt(callFuture);
+            this.releaseAsyncCachedPermits(callFuture);
             if (decObject.getSendResult().isSuccess()) {
                 
baseSender.getMetricHolder().addCallbackSucMetric(callFuture.getGroupId(),
                         callFuture.getStreamId(), callFuture.getMsgCnt(),
@@ -475,6 +476,7 @@ public class TcpClientMgr implements ClientMgr {
                     }
                 } finally {
                     nettyTcpClient.decInFlightMsgCnt(callFuture.getChanTerm());
+                    this.releaseAsyncCachedPermits(callFuture);
                     
baseSender.getMetricHolder().addCallbackFailMetric(ErrorCode.CONNECTION_BREAK.getErrCode(),
                             callFuture.getGroupId(), callFuture.getStreamId(), 
callFuture.getMsgCnt(),
                             (System.currentTimeMillis() - curTime));
@@ -495,6 +497,7 @@ public class TcpClientMgr implements ClientMgr {
                     }
                 } finally {
                     nettyTcpClient.decInFlightMsgCnt(callFuture.getChanTerm());
+                    this.releaseAsyncCachedPermits(callFuture);
                     
baseSender.getMetricHolder().addCallbackFailMetric(ErrorCode.CONNECTION_BREAK.getErrCode(),
                             callFuture.getGroupId(), callFuture.getStreamId(), 
callFuture.getMsgCnt(),
                             (System.currentTimeMillis() - curTime));
@@ -578,6 +581,7 @@ public class TcpClientMgr implements ClientMgr {
                     }
                 } finally {
                     nettyTcpClient.decInFlightMsgCnt(callFuture.getChanTerm());
+                    this.releaseAsyncCachedPermits(callFuture);
                     
baseSender.getMetricHolder().addCallbackFailMetric(ErrorCode.SDK_CLOSED.getErrCode(),
                             callFuture.getGroupId(), callFuture.getStreamId(), 
callFuture.getMsgCnt(),
                             (System.currentTimeMillis() - curTime));
@@ -598,6 +602,7 @@ public class TcpClientMgr implements ClientMgr {
                     }
                 } finally {
                     nettyTcpClient.decInFlightMsgCnt(callFuture.getChanTerm());
+                    this.releaseAsyncCachedPermits(callFuture);
                     
baseSender.getMetricHolder().addCallbackFailMetric(ErrorCode.SDK_CLOSED.getErrCode(),
                             callFuture.getGroupId(), callFuture.getStreamId(), 
callFuture.getMsgCnt(),
                             (System.currentTimeMillis() - curTime));
@@ -606,6 +611,12 @@ public class TcpClientMgr implements ClientMgr {
         }
     }
 
+    private void releaseAsyncCachedPermits(TcpCallFuture callFuture) {
+        if (callFuture.isAsyncCall()) {
+            baseSender.releaseCachePermits(callFuture.getEventSize());
+        }
+    }
+
     private class MaintThread extends Thread {
 
         private volatile boolean bShutDown;
@@ -729,9 +740,11 @@ public class TcpClientMgr implements ClientMgr {
                     }
                 } finally {
                     nettyTcpClient.decInFlightMsgCnt(future.getChanTerm());
-                    
baseSender.getMetricHolder().addCallbackFailMetric(ErrorCode.SEND_WAIT_TIMEOUT.getErrCode(),
-                            future.getGroupId(), future.getStreamId(), 
future.getMsgCnt(),
-                            (System.currentTimeMillis() - curTime));
+                    releaseAsyncCachedPermits(future);
+                    baseSender.getMetricHolder().addCallbackFailMetric(
+                            ErrorCode.SEND_WAIT_TIMEOUT.getErrCode(),
+                            future.getGroupId(), future.getStreamId(),
+                            future.getMsgCnt(), (System.currentTimeMillis() - 
curTime));
                 }
                 return;
             }
@@ -753,6 +766,7 @@ public class TcpClientMgr implements ClientMgr {
                     }
                 } finally {
                     nettyTcpClient.decInFlightMsgCnt(future.getChanTerm());
+                    releaseAsyncCachedPermits(future);
                     
baseSender.getMetricHolder().addCallbackFailMetric(ErrorCode.SEND_WAIT_TIMEOUT.getErrCode(),
                             future.getGroupId(), future.getStreamId(), 
future.getMsgCnt(),
                             (System.currentTimeMillis() - curTime));
diff --git 
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/tcp/TcpNettyClient.java
 
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/tcp/TcpNettyClient.java
index c521b04cd1..073e2b60c0 100644
--- 
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/tcp/TcpNettyClient.java
+++ 
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/tcp/TcpNettyClient.java
@@ -454,7 +454,7 @@ public class TcpNettyClient {
 
     private EncodeObject buildHeartBeatMsg(String senderId, ProxyClientConfig 
configure) {
         EncodeObject encObject = new EncodeObject(null, null,
-                MsgType.MSG_BIN_HEARTBEAT, System.currentTimeMillis());
+                MsgType.MSG_BIN_HEARTBEAT, System.currentTimeMillis(), 30);
         encObject.setMessageIdInfo(0);
         int intMsgType = encObject.getMsgType().getValue();
         Map<String, String> newAttrs = new HashMap<>();
diff --git 
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/tcp/codec/EncodeObject.java
 
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/tcp/codec/EncodeObject.java
index a32227c1af..3b44f9a3ce 100644
--- 
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/tcp/codec/EncodeObject.java
+++ 
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/tcp/codec/EncodeObject.java
@@ -44,6 +44,7 @@ public class EncodeObject {
     private final long rtms;
     private int messageId;
     private int msgCnt = 0;
+    private int eventSize;
     private int extField = 0;
     private int attrDataLength = 0;
     private byte[] attrData = null;
@@ -56,10 +57,11 @@ public class EncodeObject {
     private boolean compress;
     private byte[] aesKey;
 
-    public EncodeObject(String groupId, String streamId, MsgType msgType, long 
dtMs) {
+    public EncodeObject(String groupId, String streamId, MsgType msgType, long 
dtMs, int eventSize) {
         this.groupId = groupId;
         this.streamId = streamId;
         this.msgType = msgType;
+        this.eventSize = eventSize;
         this.intMsgType = this.msgType.getValue();
         this.rtms = System.currentTimeMillis();
         if (this.msgType == MsgType.MSG_BIN_MULTI_BODY) {
@@ -185,4 +187,8 @@ public class EncodeObject {
     public long getDtMs() {
         return dtMs;
     }
+
+    public int getEventSize() {
+        return eventSize;
+    }
 }
diff --git 
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/sender/BaseSender.java
 
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/sender/BaseSender.java
index 99e0c38289..cb4d2bae29 100644
--- 
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/sender/BaseSender.java
+++ 
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/sender/BaseSender.java
@@ -21,14 +21,17 @@ import org.apache.inlong.sdk.dataproxy.MsgSenderFactory;
 import org.apache.inlong.sdk.dataproxy.common.ErrorCode;
 import org.apache.inlong.sdk.dataproxy.common.ProcessResult;
 import org.apache.inlong.sdk.dataproxy.common.ProxyClientConfig;
+import org.apache.inlong.sdk.dataproxy.common.SdkConsts;
 import org.apache.inlong.sdk.dataproxy.config.ConfigHolder;
 import org.apache.inlong.sdk.dataproxy.config.HostInfo;
 import org.apache.inlong.sdk.dataproxy.config.ProxyConfigEntry;
 import org.apache.inlong.sdk.dataproxy.config.ProxyConfigManager;
 import org.apache.inlong.sdk.dataproxy.metric.MetricDataHolder;
 import org.apache.inlong.sdk.dataproxy.network.ClientMgr;
+import org.apache.inlong.sdk.dataproxy.network.PkgCacheQuota;
 import org.apache.inlong.sdk.dataproxy.utils.LogCounter;
 import org.apache.inlong.sdk.dataproxy.utils.ProxyUtils;
+import org.apache.inlong.sdk.dataproxy.utils.Tuple2;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -76,6 +79,8 @@ public abstract class BaseSender implements ConfigHolder {
     protected volatile boolean idTransNum = false;
     protected volatile int groupIdNum = 0;
     private Map<String, Integer> streamIdMap = new HashMap<>();
+    private final PkgCacheQuota globalCacheQuota;
+    private final PkgCacheQuota sdkPkgCacheQuota;
     // metric holder
     protected MetricDataHolder metricHolder;
 
@@ -91,6 +96,13 @@ public abstract class BaseSender implements ConfigHolder {
         this.configManager = new ProxyConfigManager(this.senderId, 
this.baseConfig, this);
         this.configManager.setDaemon(true);
         this.metricHolder = new MetricDataHolder(this);
+        if (this.senderFactory == null) {
+            this.globalCacheQuota = null;
+        } else {
+            this.globalCacheQuota = 
this.senderFactory.getFactoryPkgCacheQuota();
+        }
+        this.sdkPkgCacheQuota = new PkgCacheQuota(false, this.senderId,
+                configure.getMaxInFlightReqCnt(), 
configure.getMaxInFlightSizeKb(), configure.getPaddingSize());
     }
 
     public boolean start(ProcessResult procResult) {
@@ -198,6 +210,10 @@ public abstract class BaseSender implements ConfigHolder {
         return senderStatus.get() == SENDER_STATUS_STARTED;
     }
 
+    public boolean isGenByFactory() {
+        return senderFactory != null;
+    }
+
     public MsgSenderFactory getSenderFactory() {
         return senderFactory;
     }
@@ -242,6 +258,64 @@ public abstract class BaseSender implements ConfigHolder {
         return proxyNodeInfos.size();
     }
 
+    public Tuple2<Integer, Integer> getFactoryAvailQuota() {
+        if (senderFactory == null) {
+            return PkgCacheQuota.DISABLE_RET;
+        }
+        return globalCacheQuota.getPkgCacheAvailQuota();
+    }
+
+    public Tuple2<Integer, Integer> getSenderAvailQuota() {
+        return sdkPkgCacheQuota.getPkgCacheAvailQuota();
+    }
+
+    public int getFactoryPkgCntPermits() {
+        if (senderFactory == null) {
+            return SdkConsts.UNDEFINED_VALUE;
+        }
+        return senderFactory.getFactoryPkgCacheQuota().getPkgCntPermits();
+    }
+
+    public int getFactoryPkgSizeKbPermits() {
+        if (senderFactory == null) {
+            return SdkConsts.UNDEFINED_VALUE;
+        }
+        return senderFactory.getFactoryPkgCacheQuota().getPkgSizeKbPermits();
+    }
+
+    public int getSenderPkgCntPermits() {
+        return sdkPkgCacheQuota.getPkgCntPermits();
+    }
+
+    public int getSenderPkgSizeKbPermits() {
+        return sdkPkgCacheQuota.getPkgSizeKbPermits();
+    }
+
+    public boolean tryAcquireCachePermits(int sizeInByte, ProcessResult 
procResult) {
+        if (this.globalCacheQuota == null) {
+            return this.sdkPkgCacheQuota.tryAcquire(sizeInByte, procResult);
+        } else {
+            if (this.globalCacheQuota.tryAcquire(sizeInByte, procResult)) {
+                if (this.sdkPkgCacheQuota.tryAcquire(sizeInByte, procResult)) {
+                    return true;
+                } else {
+                    this.globalCacheQuota.release(sizeInByte);
+                    return false;
+                }
+            }
+            return false;
+        }
+    }
+
+    public void releaseCachePermits(int sizeInByte) {
+        if (this.globalCacheQuota == null) {
+            this.sdkPkgCacheQuota.release(sizeInByte);
+        } else {
+            this.sdkPkgCacheQuota.release(sizeInByte);
+            this.globalCacheQuota.release(sizeInByte);
+        }
+    }
+
     public abstract int getActiveNodeCnt();
 
     public abstract int getInflightMsgCnt();
diff --git 
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/sender/http/HttpMsgSenderConfig.java
 
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/sender/http/HttpMsgSenderConfig.java
index d05f4c821e..2d5567324e 100644
--- 
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/sender/http/HttpMsgSenderConfig.java
+++ 
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/sender/http/HttpMsgSenderConfig.java
@@ -56,8 +56,6 @@ public class HttpMsgSenderConfig extends ProxyClientConfig 
implements Cloneable
     private long httpCloseWaitPeriodMs = 
SdkConsts.VAL_DEF_HTTP_SDK_CLOSE_WAIT_MS;
     // node reuse freezing time
     private long httpNodeReuseWaitIfFailMs = 
SdkConsts.VAL_DEF_HTTP_REUSE_FAIL_WAIT_MS;
-    // message cache size for async report data.
-    private int httpAsyncRptCacheSize = 
SdkConsts.VAL_DEF_HTTP_ASYNC_RPT_CACHE_SIZE;
     // thread number for async report data.
     private int httpAsyncRptWorkerNum = 
SdkConsts.VAL_DEF_HTTP_ASYNC_RPT_WORKER_NUM;
     // interval for async worker in microseconds.
@@ -66,21 +64,25 @@ public class HttpMsgSenderConfig extends ProxyClientConfig 
implements Cloneable
     public HttpMsgSenderConfig(boolean visitMgrByHttps,
             String managerIP, int managerPort, String groupId) throws 
ProxySdkException {
         super(visitMgrByHttps, managerIP, managerPort, groupId, 
ReportProtocol.HTTP, null);
+        this.setMaxInFlightReqCnt(SdkConsts.VAL_DEF_HTTP_ASYNC_RPT_CACHE_CNT);
     }
 
     public HttpMsgSenderConfig(String managerAddress, String groupId) throws 
ProxySdkException {
         super(managerAddress, groupId, ReportProtocol.HTTP, null);
+        this.setMaxInFlightReqCnt(SdkConsts.VAL_DEF_HTTP_ASYNC_RPT_CACHE_CNT);
     }
 
     public HttpMsgSenderConfig(boolean visitMgrByHttps, String managerIP, int 
managerPort,
             String groupId, String mgrAuthSecretId, String mgrAuthSecretKey) 
throws ProxySdkException {
         super(visitMgrByHttps, managerIP, managerPort, groupId, 
ReportProtocol.HTTP, null);
+        this.setMaxInFlightReqCnt(SdkConsts.VAL_DEF_HTTP_ASYNC_RPT_CACHE_CNT);
         this.setMgrAuthzInfo(true, mgrAuthSecretId, mgrAuthSecretKey);
     }
 
     public HttpMsgSenderConfig(String managerAddress,
             String groupId, String mgrAuthSecretId, String mgrAuthSecretKey) 
throws ProxySdkException {
         super(managerAddress, groupId, ReportProtocol.HTTP, null);
+        this.setMaxInFlightReqCnt(SdkConsts.VAL_DEF_HTTP_ASYNC_RPT_CACHE_CNT);
         this.setMgrAuthzInfo(true, mgrAuthSecretId, mgrAuthSecretKey);
     }
 
@@ -171,17 +173,11 @@ public class HttpMsgSenderConfig extends 
ProxyClientConfig implements Cloneable
                         Math.max(SdkConsts.VAL_MIN_HTTP_REUSE_FAIL_WAIT_MS, 
httpNodeReuseWaitIfFailMs));
     }
 
-    public int getHttpAsyncRptCacheSize() {
-        return httpAsyncRptCacheSize;
-    }
-
     public int getHttpAsyncRptWorkerNum() {
         return httpAsyncRptWorkerNum;
     }
 
-    public void setHttpAsyncRptPoolConfig(int httpAsyncRptCacheSize, int 
httpAsyncRptWorkerNum) {
-        this.httpAsyncRptCacheSize =
-                Math.max(SdkConsts.VAL_MIN_HTTP_ASYNC_RPT_CACHE_SIZE, 
httpAsyncRptCacheSize);
+    public void setHttpAsyncRptWorkerNum(int httpAsyncRptWorkerNum) {
         this.httpAsyncRptWorkerNum =
                 Math.max(SdkConsts.VAL_MIN_HTTP_ASYNC_RPT_WORKER_NUM, 
httpAsyncRptWorkerNum);
     }
@@ -212,7 +208,6 @@ public class HttpMsgSenderConfig extends ProxyClientConfig 
implements Cloneable
                 && discardHttpCacheWhenClosing == 
that.discardHttpCacheWhenClosing
                 && httpCloseWaitPeriodMs == that.httpCloseWaitPeriodMs
                 && httpNodeReuseWaitIfFailMs == that.httpNodeReuseWaitIfFailMs
-                && httpAsyncRptCacheSize == that.httpAsyncRptCacheSize
                 && httpAsyncRptWorkerNum == that.httpAsyncRptWorkerNum
                 && httpAsyncWorkerIdleWaitMs == that.httpAsyncWorkerIdleWaitMs
                 && httpContentType == that.httpContentType
@@ -224,7 +219,7 @@ public class HttpMsgSenderConfig extends ProxyClientConfig 
implements Cloneable
         return Objects.hash(super.hashCode(), rptDataByHttps, httpContentType,
                 httpEventsSeparator, sepEventByLF, httpConTimeoutMs, 
httpSocketTimeoutMs,
                 discardHttpCacheWhenClosing, httpCloseWaitPeriodMs, 
httpNodeReuseWaitIfFailMs,
-                httpAsyncRptCacheSize, httpAsyncRptWorkerNum, 
httpAsyncWorkerIdleWaitMs);
+                httpAsyncRptWorkerNum, httpAsyncWorkerIdleWaitMs);
     }
 
     @Override
@@ -253,7 +248,6 @@ public class HttpMsgSenderConfig extends ProxyClientConfig 
implements Cloneable
                         .append(", 
discardHttpCacheWhenClosing=").append(discardHttpCacheWhenClosing)
                         .append(", 
httpCloseWaitPeriodMs=").append(httpCloseWaitPeriodMs)
                         .append(", 
httpNodeReuseWaitIfFailMs=").append(httpNodeReuseWaitIfFailMs)
-                        .append(", 
httpAsyncRptCacheSize=").append(httpAsyncRptCacheSize)
                         .append(", 
httpAsyncRptWorkerNum=").append(httpAsyncRptWorkerNum)
                         .append(", 
httpAsyncWorkerIdleWaitMs=").append(httpAsyncWorkerIdleWaitMs);
         return super.getSetting(strBuff);
diff --git 
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/sender/http/InLongHttpMsgSender.java
 
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/sender/http/InLongHttpMsgSender.java
index b764e40b52..be4fd033c7 100644
--- 
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/sender/http/InLongHttpMsgSender.java
+++ 
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/sender/http/InLongHttpMsgSender.java
@@ -82,10 +82,15 @@ public class InLongHttpMsgSender extends BaseSender 
implements HttpMsgSender {
         if (!this.isStarted()) {
             return procResult.setFailResult(ErrorCode.SDK_CLOSED);
         }
+        boolean gotPermits = false;
         try {
             if (this.isMetaInfoUnReady()) {
                 return procResult.setFailResult(ErrorCode.NO_NODE_META_INFOS);
             }
+            if (!tryAcquireCachePermits(eventInfo.getBodySize(), procResult)) {
+                return false;
+            }
+            gotPermits = true;
             // check package length
             if (!isValidPkgLength(eventInfo, this.getAllowedPkgLength(), 
procResult)) {
                 return false;
@@ -96,6 +101,9 @@ public class InLongHttpMsgSender extends BaseSender 
implements HttpMsgSender {
                 metricHolder.addAsyncHttpSucPutMetric(
                         eventInfo.getGroupId(), eventInfo.getStreamId(), 
eventInfo.getMsgCnt());
             } else {
+                if (gotPermits) {
+                    releaseCachePermits(eventInfo.getBodySize());
+                }
                 metricHolder.addAsyncHttpFailPutMetric(procResult.getErrCode(),
                         eventInfo.getGroupId(), eventInfo.getStreamId(), 
eventInfo.getMsgCnt());
             }
diff --git 
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/sender/tcp/InLongTcpMsgSender.java
 
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/sender/tcp/InLongTcpMsgSender.java
index 8add9f15f3..ea271cb8ec 100644
--- 
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/sender/tcp/InLongTcpMsgSender.java
+++ 
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/sender/tcp/InLongTcpMsgSender.java
@@ -79,10 +79,18 @@ public class InLongTcpMsgSender extends BaseSender 
implements TcpMsgSender {
         if (!this.isStarted()) {
             return procResult.setFailResult(ErrorCode.SDK_CLOSED);
         }
+        boolean gotPermits = false;
         long curTime = System.currentTimeMillis();
         try {
+            if (!tryAcquireCachePermits(eventInfo.getBodySize(), procResult)) {
+                return false;
+            }
+            gotPermits = true;
             return processEvent(SendQos.SOURCE_ACK, eventInfo, null, 
procResult);
         } finally {
+            if (gotPermits) {
+                releaseCachePermits(eventInfo.getBodySize());
+            }
             if (procResult.isSuccess()) {
                 metricHolder.addSyncSucMetric(eventInfo.getGroupId(), 
eventInfo.getStreamId(),
                         eventInfo.getMsgCnt(), (System.currentTimeMillis() - 
curTime));
@@ -100,13 +108,21 @@ public class InLongTcpMsgSender extends BaseSender 
implements TcpMsgSender {
         if (!this.isStarted()) {
             return procResult.setFailResult(ErrorCode.SDK_CLOSED);
         }
+        boolean gotPermits = false;
         try {
+            if (!tryAcquireCachePermits(eventInfo.getBodySize(), procResult)) {
+                return false;
+            }
+            gotPermits = true;
             return processEvent(SendQos.SOURCE_ACK, eventInfo, callback, 
procResult);
         } finally {
             if (procResult.isSuccess()) {
                 metricHolder.addAsyncSucReqMetric(
                         eventInfo.getGroupId(), eventInfo.getStreamId(), 
eventInfo.getMsgCnt());
             } else {
+                if (gotPermits) {
+                    releaseCachePermits(eventInfo.getBodySize());
+                }
                 metricHolder.addAsyncFailReqMetric(procResult.getErrCode(),
                         eventInfo.getGroupId(), eventInfo.getStreamId(), 
eventInfo.getMsgCnt());
             }
@@ -139,10 +155,18 @@ public class InLongTcpMsgSender extends BaseSender 
implements TcpMsgSender {
         if (!this.isStarted()) {
             return procResult.setFailResult(ErrorCode.SDK_CLOSED);
         }
+        boolean gotPermits = false;
         long curTime = System.currentTimeMillis();
         try {
+            if (!tryAcquireCachePermits(eventInfo.getBodySize(), procResult)) {
+                return false;
+            }
+            gotPermits = true;
             return processEvent(SendQos.SINK_ACK, eventInfo, null, procResult);
         } finally {
+            if (gotPermits) {
+                releaseCachePermits(eventInfo.getBodySize());
+            }
             if (procResult.isSuccess()) {
                 metricHolder.addSyncSucMetric(eventInfo.getGroupId(), 
eventInfo.getStreamId(),
                         eventInfo.getMsgCnt(), (System.currentTimeMillis() - 
curTime));
@@ -160,13 +184,21 @@ public class InLongTcpMsgSender extends BaseSender 
implements TcpMsgSender {
         if (!this.isStarted()) {
             return procResult.setFailResult(ErrorCode.SDK_CLOSED);
         }
+        boolean gotPermits = false;
         try {
+            if (!tryAcquireCachePermits(eventInfo.getBodySize(), procResult)) {
+                return false;
+            }
+            gotPermits = true;
             return processEvent(SendQos.SINK_ACK, eventInfo, callback, 
procResult);
         } finally {
             if (procResult.isSuccess()) {
                 metricHolder.addAsyncSucReqMetric(
                         eventInfo.getGroupId(), eventInfo.getStreamId(), 
eventInfo.getMsgCnt());
             } else {
+                if (gotPermits) {
+                    releaseCachePermits(eventInfo.getBodySize());
+                }
                 metricHolder.addAsyncFailReqMetric(procResult.getErrCode(),
                         eventInfo.getGroupId(), eventInfo.getStreamId(), 
eventInfo.getMsgCnt());
             }
@@ -188,8 +220,9 @@ public class InLongTcpMsgSender extends BaseSender 
implements TcpMsgSender {
         if (this.isMetaInfoUnReady()) {
             return procResult.setFailResult(ErrorCode.NO_NODE_META_INFOS);
         }
-        EncodeObject encObject = new EncodeObject(eventInfo.getGroupId(),
-                eventInfo.getStreamId(), tcpConfig.getSdkMsgType(), 
eventInfo.getDtMs());
+        EncodeObject encObject =
+                new EncodeObject(eventInfo.getGroupId(), 
eventInfo.getStreamId(),
+                        tcpConfig.getSdkMsgType(), eventInfo.getDtMs(), 
eventInfo.getBodySize());
         // pre-process attributes
         processEventAttrsInfo(sendQos, eventInfo, encObject);
         // check package length
diff --git 
a/inlong-sdk/dataproxy-sdk/src/test/java/org/apache/inlong/sdk/dataproxy/PkgCacheQuotaTest.java
 
b/inlong-sdk/dataproxy-sdk/src/test/java/org/apache/inlong/sdk/dataproxy/PkgCacheQuotaTest.java
new file mode 100644
index 0000000000..6803e70cd1
--- /dev/null
+++ 
b/inlong-sdk/dataproxy-sdk/src/test/java/org/apache/inlong/sdk/dataproxy/PkgCacheQuotaTest.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sdk.dataproxy;
+
+import org.apache.inlong.sdk.dataproxy.common.ProcessResult;
+import org.apache.inlong.sdk.dataproxy.common.SdkConsts;
+import org.apache.inlong.sdk.dataproxy.network.PkgCacheQuota;
+import org.apache.inlong.sdk.dataproxy.utils.Tuple2;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+public class PkgCacheQuotaTest {
+
+    @Test
+    public void testPkgCacheQuota() throws Exception {
+        ProcessResult procResult = new ProcessResult();
+        PkgCacheQuota quota1 =
+                new PkgCacheQuota(false, "q1", -1, -1, 20);
+        Assert.assertTrue(quota1.tryAcquire(30, procResult));
+        Assert.assertTrue(quota1.tryAcquire(1000000, procResult));
+        quota1.release(30);
+        Tuple2<Integer, Integer> result = quota1.getPkgCacheAvailQuota();
+        Assert.assertEquals(result.getF0().intValue(), 
SdkConsts.UNDEFINED_VALUE);
+        Assert.assertEquals(result.getF1().intValue(), 
SdkConsts.UNDEFINED_VALUE);
+
+        PkgCacheQuota quota2 = new PkgCacheQuota(true, "q2", 3, -1, 20);
+        Assert.assertTrue(quota2.tryAcquire(30, procResult));
+        Assert.assertTrue(quota2.tryAcquire(50, procResult));
+        Assert.assertTrue(quota2.tryAcquire(40, procResult));
+        Assert.assertFalse(quota2.tryAcquire(50, procResult));
+        result = quota2.getPkgCacheAvailQuota();
+        Assert.assertNotNull(result);
+        Assert.assertNotNull(result.getF0());
+        Assert.assertEquals(result.getF1().intValue(), 
SdkConsts.UNDEFINED_VALUE);
+        Assert.assertEquals(result.getF0().intValue(), 0);
+        quota2.release(50);
+        result = quota2.getPkgCacheAvailQuota();
+        Assert.assertEquals(result.getF0().intValue(), 1);
+
+        PkgCacheQuota quota3 = new PkgCacheQuota(false, "q3", -1, 5, 1014);
+        Assert.assertTrue(quota3.tryAcquire(10, procResult));
+        Assert.assertTrue(quota3.tryAcquire(20, procResult));
+        Assert.assertTrue(quota3.tryAcquire(30, procResult));
+        Assert.assertFalse(quota3.tryAcquire(40000, procResult));
+        Assert.assertFalse(quota3.tryAcquire(50, procResult));
+        result = quota3.getPkgCacheAvailQuota();
+        Assert.assertNotNull(result);
+        Assert.assertEquals(result.getF0().intValue(), 
SdkConsts.UNDEFINED_VALUE);
+        Assert.assertEquals(result.getF1().intValue(), 0);
+        quota3.release(50);
+        result = quota3.getPkgCacheAvailQuota();
+        Assert.assertNotNull(result);
+        Assert.assertEquals(result.getF0().intValue(), 
SdkConsts.UNDEFINED_VALUE);
+        Assert.assertEquals(result.getF1().intValue(), 2);
+
+        PkgCacheQuota quota4 = new PkgCacheQuota(true, "pa5", 5, 5, 1014);
+        Assert.assertTrue(quota4.tryAcquire(10, procResult));
+        Assert.assertTrue(quota4.tryAcquire(20, procResult));
+        Assert.assertTrue(quota4.tryAcquire(30, procResult));
+        Assert.assertFalse(quota4.tryAcquire(40000, procResult));
+        Assert.assertFalse(quota4.tryAcquire(50, procResult));
+        result = quota4.getPkgCacheAvailQuota();
+        Assert.assertNotNull(result);
+        Assert.assertEquals(result.getF0().intValue(), 2);
+        Assert.assertEquals(result.getF1().intValue(), 0);
+        quota4.release(50);
+        result = quota4.getPkgCacheAvailQuota();
+        Assert.assertNotNull(result);
+        Assert.assertEquals(result.getF0().intValue(), 3);
+        Assert.assertEquals(result.getF1().intValue(), 2);
+    }
+}
diff --git 
a/inlong-sdk/dataproxy-sdk/src/test/java/org/apache/inlong/sdk/dataproxy/ProxyClientConfigTest.java
 
b/inlong-sdk/dataproxy-sdk/src/test/java/org/apache/inlong/sdk/dataproxy/ProxyClientConfigTest.java
index 68a08a00b9..b314f8a6e4 100644
--- 
a/inlong-sdk/dataproxy-sdk/src/test/java/org/apache/inlong/sdk/dataproxy/ProxyClientConfigTest.java
+++ 
b/inlong-sdk/dataproxy-sdk/src/test/java/org/apache/inlong/sdk/dataproxy/ProxyClientConfigTest.java
@@ -29,13 +29,15 @@ public class ProxyClientConfigTest {
     public void testManagerConfig() throws Exception {
         HttpMsgSenderConfig httpConfig = new HttpMsgSenderConfig(
                 "http://127.0.0.1:800";, "test_id", "secretId", "secretKey");
-        httpConfig.setHttpAsyncRptPoolConfig(30, 20);
+        httpConfig.setMaxInFlightReqCnt(30);
+        httpConfig.setHttpAsyncRptWorkerNum(20);
         HttpMsgSenderConfig httpConfig1 = httpConfig.clone();
         Assert.assertEquals(httpConfig, httpConfig1);
         httpConfig1.setRegionName("sz");
-        httpConfig1.setHttpAsyncRptPoolConfig(50, 10);
+        httpConfig.setMaxInFlightReqCnt(50);
+        httpConfig.setHttpAsyncRptWorkerNum(10);
         Assert.assertNotEquals(httpConfig1.getRegionName(), 
httpConfig.getRegionName());
-        Assert.assertNotEquals(httpConfig1.getHttpAsyncRptCacheSize(), 
httpConfig.getHttpAsyncRptCacheSize());
+        Assert.assertNotEquals(httpConfig1.getMaxInFlightReqCnt(), 
httpConfig.getMaxInFlightReqCnt());
         Assert.assertNotEquals(httpConfig1.getHttpAsyncRptWorkerNum(), 
httpConfig.getHttpAsyncRptWorkerNum());
         httpConfig.setRptDataByHttps(true);
         httpConfig.setMetaCacheExpiredMs(30000);

Reply via email to