This is an automated email from the ASF dual-hosted git repository.

gosonzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new ea0c20407a [INLONG-11706][SDK] Optimize HTTP Sender implementation 
(#11707)
ea0c20407a is described below

commit ea0c20407aceaeb0fb7b1d3a985871d209496cac
Author: Goson Zhang <[email protected]>
AuthorDate: Wed Jan 22 17:49:09 2025 +0800

    [INLONG-11706][SDK] Optimize HTTP Sender implementation (#11707)
    
    * [INLONG-11706][SDK] Optimize HTTP Sender implementation
    
    * [INLONG-11706][SDK] Optimize HTTP Sender implementation
    
    ---------
    
    Co-authored-by: gosonzhang <[email protected]>
---
 .../inlong/sdk/dataproxy/BaseMsgSenderFactory.java | 112 ++++-
 .../inlong/sdk/dataproxy/MsgSenderFactory.java     |  20 +
 .../sdk/dataproxy/MsgSenderMultiFactory.java       |  21 +-
 .../sdk/dataproxy/MsgSenderSingleFactory.java      |  20 +-
 .../inlong/sdk/dataproxy/common/ErrorCode.java     |  10 +
 .../inlong/sdk/dataproxy/common/SdkConsts.java     |   4 +
 .../dataproxy/example/InLongFactoryExample.java    |  38 ++
 .../dataproxy/example/InLongHttpClientExample.java |  73 ++++
 .../sdk/dataproxy/http/InternalHttpSender.java     |   3 +-
 .../sdk/dataproxy/network/HttpProxySender.java     |   2 +
 .../sdk/dataproxy/network/http/HttpAsyncObj.java   |  51 +++
 .../sdk/dataproxy/network/http/HttpClientMgr.java  | 461 +++++++++++++++++++++
 .../{common => network/http}/HttpContentType.java  |   2 +-
 .../inlong/sdk/dataproxy/sender/BaseSender.java    |   2 +
 .../dataproxy/sender/http/HttpMsgSenderConfig.java |   2 +-
 .../dataproxy/sender/http/InLongHttpMsgSender.java | 136 ++++++
 .../inlong/sdk/dataproxy/utils/HttpUtils.java      |  71 ++++
 17 files changed, 1010 insertions(+), 18 deletions(-)

diff --git 
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/BaseMsgSenderFactory.java
 
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/BaseMsgSenderFactory.java
index 0bc0cf4bb9..50fa183c58 100644
--- 
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/BaseMsgSenderFactory.java
+++ 
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/BaseMsgSenderFactory.java
@@ -23,6 +23,8 @@ import 
org.apache.inlong.sdk.dataproxy.config.ProxyConfigEntry;
 import org.apache.inlong.sdk.dataproxy.config.ProxyConfigManager;
 import org.apache.inlong.sdk.dataproxy.exception.ProxySdkException;
 import org.apache.inlong.sdk.dataproxy.sender.BaseSender;
+import org.apache.inlong.sdk.dataproxy.sender.http.HttpMsgSenderConfig;
+import org.apache.inlong.sdk.dataproxy.sender.http.InLongHttpMsgSender;
 import org.apache.inlong.sdk.dataproxy.sender.tcp.InLongTcpMsgSender;
 import org.apache.inlong.sdk.dataproxy.sender.tcp.TcpMsgSenderConfig;
 import org.apache.inlong.sdk.dataproxy.utils.LogCounter;
@@ -69,9 +71,9 @@ public class BaseMsgSenderFactory {
         senderCacheLock.writeLock().lock();
         try {
             // release groupId mapped senders
-            totalSenderCnt = innReleaseAllGroupIdSenders(groupIdSenderMap);
+            totalSenderCnt = releaseAllGroupIdSenders(groupIdSenderMap);
             // release clusterId mapped senders
-            totalSenderCnt += 
innReleaseAllClusterIdSenders(clusterIdSenderMap);
+            totalSenderCnt += releaseAllClusterIdSenders(clusterIdSenderMap);
         } finally {
             senderCacheLock.writeLock().unlock();
         }
@@ -90,9 +92,9 @@ public class BaseMsgSenderFactory {
         senderCacheLock.writeLock().lock();
         try {
             if (msgSender.getFactoryClusterIdKey() == null) {
-                removed = innRemoveGroupIdSender(msgSender, groupIdSenderMap);
+                removed = removeGroupIdSender(msgSender, groupIdSenderMap);
             } else {
-                removed = innRemoveClusterIdSender(msgSender, 
clusterIdSenderMap);
+                removed = removeClusterIdSender(msgSender, clusterIdSenderMap);
             }
         } finally {
             senderCacheLock.writeLock().unlock();
@@ -108,7 +110,7 @@ public class BaseMsgSenderFactory {
 
     public InLongTcpMsgSender genTcpSenderByGroupId(
             TcpMsgSenderConfig configure, ThreadFactory selfDefineFactory) 
throws ProxySdkException {
-        ProxyUtils.validProxyConfigNotNull(configure);
+        validProxyConfigNotNull(configure);
         // query cached sender
         String metaConfigKey = configure.getGroupMetaConfigKey();
         InLongTcpMsgSender messageSender =
@@ -148,9 +150,51 @@ public class BaseMsgSenderFactory {
         }
     }
 
+    public InLongHttpMsgSender genHttpSenderByGroupId(
+            HttpMsgSenderConfig configure) throws ProxySdkException {
+        validProxyConfigNotNull(configure);
+        // query cached sender
+        String metaConfigKey = configure.getGroupMetaConfigKey();
+        InLongHttpMsgSender messageSender =
+                (InLongHttpMsgSender) groupIdSenderMap.get(metaConfigKey);
+        if (messageSender != null) {
+            return messageSender;
+        }
+        // valid configure info
+        ProcessResult procResult = new ProcessResult();
+        qryProxyMetaConfigure(configure, procResult);
+        // generate sender
+        senderCacheLock.writeLock().lock();
+        try {
+            // re-get the created sender based on the groupId key after locked
+            messageSender = (InLongHttpMsgSender) 
groupIdSenderMap.get(metaConfigKey);
+            if (messageSender != null) {
+                return messageSender;
+            }
+            // build a new sender based on groupId
+            messageSender = new InLongHttpMsgSender(configure, 
msgSenderFactory, null);
+            if (!messageSender.start(procResult)) {
+                messageSender.close();
+                throw new ProxySdkException("Failed to start groupId sender: " 
+ procResult);
+            }
+            groupIdSenderMap.put(metaConfigKey, messageSender);
+            logger.info("MsgSenderFactory({}) generated a new groupId({}) 
sender({})",
+                    this.factoryNo, metaConfigKey, 
messageSender.getSenderId());
+            return messageSender;
+        } catch (Throwable ex) {
+            if (exptCounter.shouldPrint()) {
+                logger.warn("MsgSenderFactory({}) build groupId sender({}) 
exception",
+                        this.factoryNo, metaConfigKey, ex);
+            }
+            throw new ProxySdkException("Failed to build groupId sender: " + 
ex.getMessage());
+        } finally {
+            senderCacheLock.writeLock().unlock();
+        }
+    }
+
     public InLongTcpMsgSender genTcpSenderByClusterId(
             TcpMsgSenderConfig configure, ThreadFactory selfDefineFactory) 
throws ProxySdkException {
-        ProxyUtils.validProxyConfigNotNull(configure);
+        validProxyConfigNotNull(configure);
         // get groupId's clusterIdKey
         ProcessResult procResult = new ProcessResult();
         ProxyConfigEntry proxyConfigEntry = qryProxyMetaConfigure(configure, 
procResult);;
@@ -191,6 +235,48 @@ public class BaseMsgSenderFactory {
         }
     }
 
+    public InLongHttpMsgSender genHttpSenderByClusterId(
+            HttpMsgSenderConfig configure) throws ProxySdkException {
+        validProxyConfigNotNull(configure);
+        // get groupId's clusterIdKey
+        ProcessResult procResult = new ProcessResult();
+        ProxyConfigEntry proxyConfigEntry = qryProxyMetaConfigure(configure, 
procResult);;
+        String clusterIdKey = ProxyUtils.buildClusterIdKey(
+                configure.getDataRptProtocol(), configure.getRegionName(), 
proxyConfigEntry.getClusterId());
+        // get local built sender
+        InLongHttpMsgSender messageSender = (InLongHttpMsgSender) 
clusterIdSenderMap.get(clusterIdKey);
+        if (messageSender != null) {
+            return messageSender;
+        }
+        // generate sender
+        senderCacheLock.writeLock().lock();
+        try {
+            // re-get the created sender based on the clusterId Key after 
locked
+            messageSender = (InLongHttpMsgSender) 
clusterIdSenderMap.get(clusterIdKey);
+            if (messageSender != null) {
+                return messageSender;
+            }
+            // build a new sender based on clusterId Key
+            messageSender = new InLongHttpMsgSender(configure, 
msgSenderFactory, clusterIdKey);
+            if (!messageSender.start(procResult)) {
+                messageSender.close();
+                throw new ProxySdkException("Failed to start cluster sender: " 
+ procResult);
+            }
+            clusterIdSenderMap.put(clusterIdKey, messageSender);
+            logger.info("MsgSenderFactory({}) generated a new clusterId({}) 
sender({})",
+                    this.factoryNo, clusterIdKey, messageSender.getSenderId());
+            return messageSender;
+        } catch (Throwable ex) {
+            if (exptCounter.shouldPrint()) {
+                logger.warn("MsgSenderFactory({}) build cluster sender({}) 
exception",
+                        this.factoryNo, clusterIdKey, ex);
+            }
+            throw new ProxySdkException("Failed to build cluster sender: " + 
ex.getMessage());
+        } finally {
+            senderCacheLock.writeLock().unlock();
+        }
+    }
+
     private ProxyConfigEntry qryProxyMetaConfigure(
             ProxyClientConfig proxyConfig, ProcessResult procResult) throws 
ProxySdkException {
         ProxyConfigManager inlongMetaQryMgr = new 
ProxyConfigManager(proxyConfig);
@@ -205,7 +291,7 @@ public class BaseMsgSenderFactory {
         return inlongMetaQryMgr.getProxyConfigEntry();
     }
 
-    private boolean innRemoveGroupIdSender(BaseSender msgSender, Map<String, 
BaseSender> senderMap) {
+    private boolean removeGroupIdSender(BaseSender msgSender, Map<String, 
BaseSender> senderMap) {
         BaseSender tmpSender = senderMap.get(msgSender.getMetaConfigKey());
         if (tmpSender == null
                 || !tmpSender.getSenderId().equals(msgSender.getSenderId())) {
@@ -214,7 +300,7 @@ public class BaseMsgSenderFactory {
         return senderMap.remove(msgSender.getMetaConfigKey()) != null;
     }
 
-    private boolean innRemoveClusterIdSender(BaseSender msgSender, Map<String, 
BaseSender> senderMap) {
+    private boolean removeClusterIdSender(BaseSender msgSender, Map<String, 
BaseSender> senderMap) {
         BaseSender tmpSender = 
senderMap.get(msgSender.getFactoryClusterIdKey());
         if (tmpSender == null
                 || !tmpSender.getSenderId().equals(msgSender.getSenderId())) {
@@ -223,7 +309,7 @@ public class BaseMsgSenderFactory {
         return senderMap.remove(msgSender.getFactoryClusterIdKey()) != null;
     }
 
-    private int innReleaseAllGroupIdSenders(Map<String, BaseSender> senderMap) 
{
+    private int releaseAllGroupIdSenders(Map<String, BaseSender> senderMap) {
         int totalSenderCnt = 0;
         for (Map.Entry<String, BaseSender> entry : senderMap.entrySet()) {
             if (entry == null || entry.getValue() == null) {
@@ -243,7 +329,7 @@ public class BaseMsgSenderFactory {
         return totalSenderCnt;
     }
 
-    private int innReleaseAllClusterIdSenders(Map<String, BaseSender> 
senderMap) {
+    private int releaseAllClusterIdSenders(Map<String, BaseSender> senderMap) {
         int totalSenderCnt = 0;
         for (Map.Entry<String, BaseSender> entry : senderMap.entrySet()) {
             if (entry == null
@@ -264,4 +350,10 @@ public class BaseMsgSenderFactory {
         senderMap.clear();
         return totalSenderCnt;
     }
+
+    private void validProxyConfigNotNull(ProxyClientConfig configure) throws 
ProxySdkException {
+        if (configure == null) {
+            throw new ProxySdkException("configure is null!");
+        }
+    }
 }
diff --git 
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/MsgSenderFactory.java
 
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/MsgSenderFactory.java
index a6a4e20b4c..d2169d7152 100644
--- 
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/MsgSenderFactory.java
+++ 
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/MsgSenderFactory.java
@@ -19,6 +19,8 @@ package org.apache.inlong.sdk.dataproxy;
 
 import org.apache.inlong.sdk.dataproxy.exception.ProxySdkException;
 import org.apache.inlong.sdk.dataproxy.sender.BaseSender;
+import org.apache.inlong.sdk.dataproxy.sender.http.HttpMsgSenderConfig;
+import org.apache.inlong.sdk.dataproxy.sender.http.InLongHttpMsgSender;
 import org.apache.inlong.sdk.dataproxy.sender.tcp.InLongTcpMsgSender;
 import org.apache.inlong.sdk.dataproxy.sender.tcp.TcpMsgSenderConfig;
 
@@ -88,4 +90,22 @@ public interface MsgSenderFactory {
      */
     InLongTcpMsgSender genTcpSenderByClusterId(
             TcpMsgSenderConfig configure, ThreadFactory selfDefineFactory) 
throws ProxySdkException;
+
+    /**
+     * Get or generate a http sender from the factory according to groupId
+     *
+     * @param configure  the sender configure
+     * @return the sender
+     */
+    InLongHttpMsgSender genHttpSenderByGroupId(
+            HttpMsgSenderConfig configure) throws ProxySdkException;
+
+    /**
+     * Get or generate a http sender from the factory according to clusterId
+     *
+     * @param configure  the sender configure
+     * @return the sender
+     */
+    InLongHttpMsgSender genHttpSenderByClusterId(
+            HttpMsgSenderConfig configure) throws ProxySdkException;
 }
diff --git 
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/MsgSenderMultiFactory.java
 
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/MsgSenderMultiFactory.java
index 0cb595c261..f42d8b4d61 100644
--- 
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/MsgSenderMultiFactory.java
+++ 
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/MsgSenderMultiFactory.java
@@ -19,9 +19,10 @@ package org.apache.inlong.sdk.dataproxy;
 
 import org.apache.inlong.sdk.dataproxy.exception.ProxySdkException;
 import org.apache.inlong.sdk.dataproxy.sender.BaseSender;
+import org.apache.inlong.sdk.dataproxy.sender.http.HttpMsgSenderConfig;
+import org.apache.inlong.sdk.dataproxy.sender.http.InLongHttpMsgSender;
 import org.apache.inlong.sdk.dataproxy.sender.tcp.InLongTcpMsgSender;
 import org.apache.inlong.sdk.dataproxy.sender.tcp.TcpMsgSenderConfig;
-import org.apache.inlong.sdk.dataproxy.utils.ProxyUtils;
 
 import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -79,7 +80,6 @@ public class MsgSenderMultiFactory implements 
MsgSenderFactory {
         if (!this.initialized.get()) {
             throw new ProxySdkException("Please initialize the factory 
first!");
         }
-        ProxyUtils.validProxyConfigNotNull(configure);
         return this.baseMsgSenderFactory.genTcpSenderByGroupId(configure, 
selfDefineFactory);
     }
 
@@ -95,7 +95,22 @@ public class MsgSenderMultiFactory implements 
MsgSenderFactory {
         if (!this.initialized.get()) {
             throw new ProxySdkException("Please initialize the factory 
first!");
         }
-        ProxyUtils.validProxyConfigNotNull(configure);
         return this.baseMsgSenderFactory.genTcpSenderByClusterId(configure, 
selfDefineFactory);
     }
+
+    @Override
+    public InLongHttpMsgSender genHttpSenderByGroupId(HttpMsgSenderConfig 
configure) throws ProxySdkException {
+        if (!this.initialized.get()) {
+            throw new ProxySdkException("Please initialize the factory 
first!");
+        }
+        return this.baseMsgSenderFactory.genHttpSenderByGroupId(configure);
+    }
+
+    @Override
+    public InLongHttpMsgSender genHttpSenderByClusterId(HttpMsgSenderConfig 
configure) throws ProxySdkException {
+        if (!this.initialized.get()) {
+            throw new ProxySdkException("Please initialize the factory 
first!");
+        }
+        return this.baseMsgSenderFactory.genHttpSenderByClusterId(configure);
+    }
 }
diff --git 
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/MsgSenderSingleFactory.java
 
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/MsgSenderSingleFactory.java
index 91b1735155..ad891c971b 100644
--- 
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/MsgSenderSingleFactory.java
+++ 
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/MsgSenderSingleFactory.java
@@ -19,6 +19,8 @@ package org.apache.inlong.sdk.dataproxy;
 
 import org.apache.inlong.sdk.dataproxy.exception.ProxySdkException;
 import org.apache.inlong.sdk.dataproxy.sender.BaseSender;
+import org.apache.inlong.sdk.dataproxy.sender.http.HttpMsgSenderConfig;
+import org.apache.inlong.sdk.dataproxy.sender.http.InLongHttpMsgSender;
 import org.apache.inlong.sdk.dataproxy.sender.tcp.InLongTcpMsgSender;
 import org.apache.inlong.sdk.dataproxy.sender.tcp.TcpMsgSenderConfig;
 import org.apache.inlong.sdk.dataproxy.utils.ProxyUtils;
@@ -86,7 +88,6 @@ public class MsgSenderSingleFactory implements 
MsgSenderFactory {
         if (!initialized.get()) {
             throw new ProxySdkException("Please initialize the factory 
first!");
         }
-        ProxyUtils.validProxyConfigNotNull(configure);
         return baseMsgSenderFactory.genTcpSenderByGroupId(configure, 
selfDefineFactory);
     }
 
@@ -102,7 +103,22 @@ public class MsgSenderSingleFactory implements 
MsgSenderFactory {
         if (!initialized.get()) {
             throw new ProxySdkException("Please initialize the factory 
first!");
         }
-        ProxyUtils.validProxyConfigNotNull(configure);
         return baseMsgSenderFactory.genTcpSenderByClusterId(configure, 
selfDefineFactory);
     }
+
+    @Override
+    public InLongHttpMsgSender genHttpSenderByGroupId(HttpMsgSenderConfig 
configure) throws ProxySdkException {
+        if (!initialized.get()) {
+            throw new ProxySdkException("Please initialize the factory 
first!");
+        }
+        return baseMsgSenderFactory.genHttpSenderByGroupId(configure);
+    }
+
+    @Override
+    public InLongHttpMsgSender genHttpSenderByClusterId(HttpMsgSenderConfig 
configure) throws ProxySdkException {
+        if (!initialized.get()) {
+            throw new ProxySdkException("Please initialize the factory 
first!");
+        }
+        return baseMsgSenderFactory.genHttpSenderByClusterId(configure);
+    }
 }
diff --git 
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/common/ErrorCode.java
 
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/common/ErrorCode.java
index 187a1b56b9..b1dd6f26c3 100644
--- 
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/common/ErrorCode.java
+++ 
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/common/ErrorCode.java
@@ -93,6 +93,16 @@ public enum ErrorCode {
     DP_UNCONFIGURED_GROUPID_OR_STREAMID(155, "DataProxy return unconfigured 
groupId or streamId"),
     //
     DP_RECEIVE_FAILURE(160, "DataProxy return message receive failure"),
+    //
+    HTTP_ASYNC_POOL_FULL(171, "Http async pool full"),
+    HTTP_ASYNC_OFFER_FAIL(172, "Http async offer event fail"),
+    HTTP_ASYNC_OFFER_EXCEPTION(173, "Http async offer event exception"),
+    HTTP_BUILD_CLIENT_EXCEPTION(174, "Http build client exception"),
+    //
+    BUILD_FORM_CONTENT_EXCEPTION(181, "Build form content exception"),
+    DP_RETURN_FAILURE(182, "DataProxy return failure"),
+    HTTP_VISIT_DP_EXCEPTION(183, "Http visit exception"),
+    DP_RETURN_UNKNOWN_ERROR(184, "DataProxy return unknown error"),
 
     UNKNOWN_ERROR(9999, "Unknown error");
 
diff --git 
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/common/SdkConsts.java
 
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/common/SdkConsts.java
index cf2d006742..948088bc8a 100644
--- 
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/common/SdkConsts.java
+++ 
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/common/SdkConsts.java
@@ -21,6 +21,8 @@ public class SdkConsts {
 
     public static String PREFIX_HTTP = "http://";;
     public static String PREFIX_HTTPS = "https://";;
+    public static final String KEY_HTTP_FIELD_BODY = "body";
+    public static final String KEY_HTTP_FIELD_DELIMITER = "rcdDlmtr";
 
     // dataproxy node config
     public static final String MANAGER_DATAPROXY_API = 
"/inlong/manager/openapi/dataproxy/getIpList/";
@@ -32,6 +34,8 @@ public class SdkConsts {
     public static final String BASIC_AUTH_HEADER = "authorization";
     // default region name
     public static final String VAL_DEF_REGION_NAME = "";
+    // http report method
+    public static final String DATAPROXY_REPORT_METHOD = "/dataproxy/message";
     // config info sync interval in minutes
     public static final int VAL_DEF_CONFIG_SYNC_INTERVAL_MIN = 3;
     public static final int VAL_MIN_CONFIG_SYNC_INTERVAL_MIN = 1;
diff --git 
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/example/InLongFactoryExample.java
 
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/example/InLongFactoryExample.java
index d1c8b8e19c..c9e438df3c 100644
--- 
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/example/InLongFactoryExample.java
+++ 
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/example/InLongFactoryExample.java
@@ -21,6 +21,8 @@ import org.apache.inlong.common.msg.MsgType;
 import org.apache.inlong.sdk.dataproxy.MsgSenderMultiFactory;
 import org.apache.inlong.sdk.dataproxy.MsgSenderSingleFactory;
 import org.apache.inlong.sdk.dataproxy.common.ProcessResult;
+import org.apache.inlong.sdk.dataproxy.sender.http.HttpMsgSenderConfig;
+import org.apache.inlong.sdk.dataproxy.sender.http.InLongHttpMsgSender;
 import org.apache.inlong.sdk.dataproxy.sender.tcp.InLongTcpMsgSender;
 import org.apache.inlong.sdk.dataproxy.sender.tcp.TcpMsgSenderConfig;
 import org.apache.inlong.sdk.dataproxy.utils.ProxyUtils;
@@ -74,6 +76,22 @@ public class InLongFactoryExample {
         ProxyUtils.sleepSomeTime(10000L);
         tcpMsgSender.close();
 
+        // report data by http
+        HttpMsgSenderConfig httpMsgSenderConfig = new HttpMsgSenderConfig(
+                false, managerIp, managerPort, groupId, secretId, secretKey);
+        InLongHttpMsgSender httpMsgSender =
+                singleFactory.genHttpSenderByGroupId(httpMsgSenderConfig);
+        if (!httpMsgSender.start(procResult)) {
+            System.out.println("Start http sender failure: process result=" + 
procResult);
+        }
+        ExampleUtils.sendHttpMessages(httpMsgSender, false, false,
+                groupId, streamId, reqCnt, msgSize, msgCnt, procResult);
+        ExampleUtils.sendHttpMessages(httpMsgSender, false, true,
+                groupId, streamId, reqCnt, msgSize, msgCnt, procResult);
+        ProxyUtils.sleepSomeTime(10000L);
+        httpMsgSender.close();
+        System.out.println("Cur singleton factory sender count is " + 
singleFactory.getMsgSenderCount());
+
         // report data use multi-factory
         MsgSenderMultiFactory multiFactory1 = new MsgSenderMultiFactory();
         MsgSenderMultiFactory multiFactory2 = new MsgSenderMultiFactory();
@@ -104,6 +122,26 @@ public class InLongFactoryExample {
         System.out.println("Multi-1.2 Cur multiFactory1 sender count = "
                 + multiFactory1.getMsgSenderCount()
                 + ", cur multiFactory2 sender count is " + 
multiFactory2.getMsgSenderCount());
+        // report data by http
+        InLongHttpMsgSender httpMsgSender1 =
+                multiFactory1.genHttpSenderByGroupId(httpMsgSenderConfig);
+        HttpMsgSenderConfig httpConfg2 = new HttpMsgSenderConfig(false,
+                managerIp, managerPort, groupId, secretId, secretKey);
+        InLongHttpMsgSender httpMsgSender2 =
+                multiFactory2.genHttpSenderByGroupId(httpConfg2);
+        ExampleUtils.sendHttpMessages(httpMsgSender1, false, false,
+                groupId, streamId, reqCnt, msgSize, msgCnt, procResult);
+        ExampleUtils.sendHttpMessages(httpMsgSender2, false, true,
+                groupId, streamId, reqCnt, msgSize, msgCnt, procResult);
+        ProxyUtils.sleepSomeTime(10000L);
+        httpMsgSender1.close();
+        System.out.println("Multi-2.1 Cur multiFactory1 sender count = "
+                + multiFactory1.getMsgSenderCount()
+                + ", cur multiFactory2 sender count is " + 
multiFactory2.getMsgSenderCount());
+        httpMsgSender2.close();
+        System.out.println("Multi-2.2 Cur multiFactory1 sender count = "
+                + multiFactory1.getMsgSenderCount()
+                + ", cur multiFactory2 sender count is " + 
multiFactory2.getMsgSenderCount());
 
         // test self DefineFactory
         ThreadFactory selfDefineFactory = new 
DefaultThreadFactory("test_self_thread_factory");
diff --git 
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/example/InLongHttpClientExample.java
 
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/example/InLongHttpClientExample.java
new file mode 100644
index 0000000000..fccdac4a5c
--- /dev/null
+++ 
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/example/InLongHttpClientExample.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sdk.dataproxy.example;
+
+import org.apache.inlong.sdk.dataproxy.common.ProcessResult;
+import org.apache.inlong.sdk.dataproxy.sender.http.HttpMsgSenderConfig;
+import org.apache.inlong.sdk.dataproxy.sender.http.InLongHttpMsgSender;
+import org.apache.inlong.sdk.dataproxy.utils.ProxyUtils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class InLongHttpClientExample {
+
+    protected static final Logger logger = 
LoggerFactory.getLogger(InLongHttpClientExample.class);
+
+    public static void main(String[] args) throws Exception {
+
+        String managerIp = args[0];
+        String managerPort = args[1];
+        String groupId = args[2];
+        String streamId = args[3];
+        String secretId = args[4];
+        String secretKey = args[5];
+        int reqCnt = Integer.parseInt(args[6]);
+        int msgSize = 1024;
+        int msgCnt = 1;
+        if (args.length > 7) {
+            msgSize = Integer.parseInt(args[7]);
+            msgCnt = Integer.parseInt(args[8]);
+        }
+
+        String managerAddr = "http://"; + managerIp + ":" + managerPort;
+
+        HttpMsgSenderConfig dataProxyConfig =
+                new HttpMsgSenderConfig(managerAddr, groupId, secretId, 
secretKey);
+        InLongHttpMsgSender messageSender = new 
InLongHttpMsgSender(dataProxyConfig);
+
+        ProcessResult procResult = new ProcessResult();
+        if (!messageSender.start(procResult)) {
+            System.out.println("Start http sender failure: process result=" + 
procResult);
+        }
+
+        System.out.println("InLongHttpMsgSender start, nodes="
+                + messageSender.getProxyNodeInfos());
+
+        ExampleUtils.sendHttpMessages(messageSender, true, false,
+                groupId, streamId, reqCnt, msgSize, msgCnt, procResult);
+        ExampleUtils.sendHttpMessages(messageSender, true, true,
+                groupId, streamId, reqCnt, msgSize, msgCnt, procResult);
+        ExampleUtils.sendHttpMessages(messageSender, false, false,
+                groupId, streamId, reqCnt, msgSize, msgCnt, procResult);
+        ExampleUtils.sendHttpMessages(messageSender, false, true,
+                groupId, streamId, reqCnt, msgSize, msgCnt, procResult);
+
+        ProxyUtils.sleepSomeTime(10000L);
+    }
+}
diff --git 
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/http/InternalHttpSender.java
 
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/http/InternalHttpSender.java
index 51452d528a..382ba58ad8 100644
--- 
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/http/InternalHttpSender.java
+++ 
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/http/InternalHttpSender.java
@@ -52,8 +52,9 @@ import java.util.concurrent.Executors;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.TimeUnit;
 
+@Deprecated
 /**
- * internal http sender
+ * Replace by InLongHttpMsgSender
  */
 public class InternalHttpSender {
 
diff --git 
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/HttpProxySender.java
 
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/HttpProxySender.java
index 1b5653bc46..bd06d38b7f 100644
--- 
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/HttpProxySender.java
+++ 
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/HttpProxySender.java
@@ -37,8 +37,10 @@ import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.ThreadLocalRandom;
 import java.util.concurrent.TimeUnit;
 
+@Deprecated
 /**
  * http sender
+ * Replace by InLongHttpMsgSender
  */
 public class HttpProxySender extends Thread {
 
diff --git 
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/http/HttpAsyncObj.java
 
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/http/HttpAsyncObj.java
new file mode 100644
index 0000000000..e58644fb29
--- /dev/null
+++ 
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/http/HttpAsyncObj.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sdk.dataproxy.network.http;
+
+import org.apache.inlong.sdk.dataproxy.sender.MsgSendCallback;
+import org.apache.inlong.sdk.dataproxy.sender.http.HttpEventInfo;
+
+/**
+ * HTTP Asynchronously Object class
+ *
+ * Used to carry the reported message content
+ */
+public class HttpAsyncObj {
+
+    private final HttpEventInfo httpEvent;
+    private final MsgSendCallback callback;
+    private final long rptMs;
+
+    public HttpAsyncObj(HttpEventInfo httpEvent, MsgSendCallback callback) {
+        this.httpEvent = httpEvent;
+        this.callback = callback;
+        this.rptMs = System.currentTimeMillis();
+    }
+
+    public HttpEventInfo getHttpEvent() {
+        return httpEvent;
+    }
+
+    public MsgSendCallback getCallback() {
+        return callback;
+    }
+
+    public long getRptMs() {
+        return rptMs;
+    }
+}
diff --git 
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/http/HttpClientMgr.java
 
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/http/HttpClientMgr.java
new file mode 100644
index 0000000000..4e875359cf
--- /dev/null
+++ 
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/http/HttpClientMgr.java
@@ -0,0 +1,461 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sdk.dataproxy.network.http;
+
+import org.apache.inlong.common.enums.DataProxyErrCode;
+import org.apache.inlong.common.msg.AttributeConstants;
+import org.apache.inlong.sdk.dataproxy.common.ErrorCode;
+import org.apache.inlong.sdk.dataproxy.common.ProcessResult;
+import org.apache.inlong.sdk.dataproxy.common.SdkConsts;
+import org.apache.inlong.sdk.dataproxy.config.HostInfo;
+import org.apache.inlong.sdk.dataproxy.network.ClientMgr;
+import org.apache.inlong.sdk.dataproxy.sender.BaseSender;
+import org.apache.inlong.sdk.dataproxy.sender.http.HttpEventInfo;
+import org.apache.inlong.sdk.dataproxy.sender.http.HttpMsgSenderConfig;
+import org.apache.inlong.sdk.dataproxy.utils.HttpUtils;
+import org.apache.inlong.sdk.dataproxy.utils.LogCounter;
+import org.apache.inlong.sdk.dataproxy.utils.ProxyUtils;
+
+import com.google.gson.JsonElement;
+import com.google.gson.JsonObject;
+import com.google.gson.JsonParser;
+import io.netty.handler.codec.http.HttpHeaderValues;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.http.HttpHeaders;
+import org.apache.http.HttpStatus;
+import org.apache.http.client.methods.CloseableHttpResponse;
+import org.apache.http.client.methods.HttpPost;
+import org.apache.http.client.utils.URLEncodedUtils;
+import org.apache.http.entity.StringEntity;
+import org.apache.http.impl.client.CloseableHttpClient;
+import org.apache.http.message.BasicNameValuePair;
+import org.apache.http.util.EntityUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * HTTP Client Manager class
+ *
+ * Used to manage HTTP clients, including periodically selecting proxy nodes,
+ *  finding available nodes when reporting messages, maintaining inflight 
message
+ *  sending status, finding responses to corresponding requests, etc.
+ */
+public class HttpClientMgr implements ClientMgr {
+
+    private static final Logger logger = 
LoggerFactory.getLogger(HttpClientMgr.class);
+    private static final LogCounter updConExptCnt = new LogCounter(10, 100000, 
60 * 1000L);
+    private static final LogCounter sendMsgExptCnt = new LogCounter(10, 
100000, 60 * 1000L);
+    private static final LogCounter asyncSendExptCnt = new LogCounter(10, 
100000, 60 * 1000L);
+
+    private final BaseSender sender;
+    private final HttpMsgSenderConfig httpConfig;
+    private CloseableHttpClient httpClient;
+    private final LinkedBlockingQueue<HttpAsyncObj> messageCache;
+    private final Semaphore asyncIdleCellCnt;
+    private final ExecutorService workerServices = 
Executors.newCachedThreadPool();
+    private final AtomicBoolean shutDown = new AtomicBoolean(false);
+    // meta info
+    private ConcurrentHashMap<String, HostInfo> usingNodeMaps = new 
ConcurrentHashMap<>();
+    private final ConcurrentHashMap<String, Long> connFailNodeMap = new 
ConcurrentHashMap<>();
+    // current using nodes
+    private List<String> activeNodes = new ArrayList<>();
+    private volatile long lastUpdateTime = -1;
+    // node select index
+    private final AtomicInteger reqSendIndex = new AtomicInteger(0);
+
+    public HttpClientMgr(BaseSender sender, HttpMsgSenderConfig httpConfig) {
+        this.sender = sender;
+        this.httpConfig = httpConfig;
+        this.messageCache = new 
LinkedBlockingQueue<>(httpConfig.getHttpAsyncRptCacheSize());
+        this.asyncIdleCellCnt = new 
Semaphore(httpConfig.getHttpAsyncRptCacheSize(), true);
+    }
+
+    @Override
+    public boolean start(ProcessResult procResult) {
+        // build http client
+        if (!HttpUtils.constructHttpClient(httpConfig.isRptDataByHttps(),
+                httpConfig.getHttpSocketTimeoutMs(), 
httpConfig.getHttpConTimeoutMs(),
+                httpConfig.getTlsVersion(), procResult)) {
+            return false;
+        }
+        this.httpClient = (CloseableHttpClient) procResult.getRetData();
+        // build async report workers
+        for (int i = 0; i < httpConfig.getHttpAsyncRptWorkerNum(); i++) {
+            workerServices.execute(new HttpAsyncReportWorker(i));
+        }
+        logger.info("ClientMgr({}) started!", this.sender.getSenderId());
+        return procResult.setSuccess();
+    }
+
+    /**
+     * close resources
+     */
+    @Override
+    public void stop() {
+        if (!this.shutDown.compareAndSet(false, true)) {
+            return;
+        }
+        int remainCnt = 0;
+        if (!messageCache.isEmpty()) {
+            long startTime = System.currentTimeMillis();
+            while (!messageCache.isEmpty()) {
+                if (System.currentTimeMillis() - startTime >= 
httpConfig.getHttpCloseWaitPeriodMs()) {
+                    break;
+                }
+                ProxyUtils.sleepSomeTime(100L);
+            }
+            remainCnt = messageCache.size();
+            messageCache.clear();
+        }
+        workerServices.shutdown();
+        if (httpClient != null) {
+            try {
+                httpClient.close();
+            } catch (Throwable ignore) {
+                //
+            }
+        }
+        logger.info("ClientMgr({}) stopped, remain ({}) messages discarded!",
+                this.sender.getSenderId(), remainCnt);
+    }
+
+    @Override
+    public int getInflightMsgCnt() {
+        return this.messageCache.size();
+    }
+
+    @Override
+    public int getActiveNodeCnt() {
+        return activeNodes.size();
+    }
+
+    @Override
+    public void updateProxyInfoList(boolean nodeChanged, 
ConcurrentHashMap<String, HostInfo> hostInfoMap) {
+        if (hostInfoMap.isEmpty() || this.shutDown.get()) {
+            return;
+        }
+        long curTime = System.currentTimeMillis();
+        try {
+            // shuffle candidate nodes
+            List<HostInfo> candidateNodes = new 
ArrayList<>(hostInfoMap.size());
+            candidateNodes.addAll(hostInfoMap.values());
+            Collections.sort(candidateNodes);
+            Collections.shuffle(candidateNodes);
+            int curTotalCnt = candidateNodes.size();
+            int needActiveCnt = Math.min(httpConfig.getAliveConnections(), 
curTotalCnt);
+            // build next step nodes
+            Long lstFailTime;
+            int maxCycleCnt = 3;
+            this.connFailNodeMap.clear();
+            List<String> realHosts = new ArrayList<>();
+            ConcurrentHashMap<String, HostInfo> tmpNodeMaps = new 
ConcurrentHashMap<>();
+            do {
+                int selectCnt = 0;
+                long selectTime = System.currentTimeMillis();
+                for (HostInfo hostInfo : candidateNodes) {
+                    if (realHosts.contains(hostInfo.getReferenceName())) {
+                        continue;
+                    }
+                    lstFailTime = 
this.connFailNodeMap.get(hostInfo.getReferenceName());
+                    if (lstFailTime != null
+                            && selectTime - lstFailTime <= 
httpConfig.getHttpNodeReuseWaitIfFailMs()) {
+                        continue;
+                    }
+                    tmpNodeMaps.put(hostInfo.getReferenceName(), hostInfo);
+                    realHosts.add(hostInfo.getReferenceName());
+                    if (lstFailTime != null) {
+                        
this.connFailNodeMap.remove(hostInfo.getReferenceName());
+                    }
+                    if (++selectCnt >= needActiveCnt) {
+                        break;
+                    }
+                }
+                if (!realHosts.isEmpty()) {
+                    break;
+                }
+                ProxyUtils.sleepSomeTime(1000L);
+            } while (--maxCycleCnt > 0);
+            // update active nodes
+            if (realHosts.isEmpty()) {
+                if (nodeChanged) {
+                    logger.error("ClientMgr({}) changed nodes, but all nodes 
failure, nodes={}, failNodes={}!",
+                            this.sender.getSenderId(), candidateNodes, 
connFailNodeMap);
+                } else {
+                    logger.error("ClientMgr({}) re-choose nodes, but all nodes 
failure, nodes={}, failNodes={}!",
+                            this.sender.getSenderId(), candidateNodes, 
connFailNodeMap);
+                }
+            } else {
+                this.lastUpdateTime = System.currentTimeMillis();
+                this.usingNodeMaps = tmpNodeMaps;
+                this.activeNodes = realHosts;
+                if (nodeChanged) {
+                    logger.info("ClientMgr({}) changed nodes, wast {}ms, 
nodeCnt=(r:{}-a:{}), actives={}, fail={}",
+                            this.sender.getSenderId(), 
(System.currentTimeMillis() - curTime),
+                            needActiveCnt, realHosts.size(), realHosts, 
connFailNodeMap.keySet());
+                } else {
+                    logger.info("ClientMgr({}) re-choose nodes, wast {}ms, 
nodeCnt=(r:{}-a:{}), actives={}, fail={}",
+                            this.sender.getSenderId(), 
(System.currentTimeMillis() - curTime),
+                            needActiveCnt, realHosts.size(), realHosts, 
connFailNodeMap.keySet());
+                }
+            }
+        } catch (Throwable ex) {
+            if (updConExptCnt.shouldPrint()) {
+                logger.warn("ClientMgr({}) update nodes throw exception",
+                        this.sender.getSenderId(), ex);
+            }
+        }
+    }
+
+    public boolean asyncSendMessage(HttpAsyncObj asyncObj, ProcessResult 
procResult) {
+        if (this.shutDown.get()) {
+            return procResult.setFailResult(ErrorCode.SDK_CLOSED);
+        }
+        List<String> curNodes = this.activeNodes;
+        if (curNodes.isEmpty()) {
+            return procResult.setFailResult(ErrorCode.EMPTY_ACTIVE_NODE_SET);
+        }
+        if (!this.asyncIdleCellCnt.tryAcquire()) {
+            return procResult.setFailResult(ErrorCode.HTTP_ASYNC_POOL_FULL);
+        }
+        boolean released = false;
+        try {
+            if (!this.messageCache.offer(asyncObj)) {
+                this.asyncIdleCellCnt.release();
+                released = true;
+                return 
procResult.setFailResult(ErrorCode.HTTP_ASYNC_OFFER_FAIL);
+            }
+            return procResult.setSuccess();
+        } catch (Throwable ex) {
+            if (!released) {
+                this.asyncIdleCellCnt.release();
+            }
+            if (asyncSendExptCnt.shouldPrint()) {
+                logger.warn("ClientMgr({}) async offer event exception", 
this.sender.getSenderId(), ex);
+            }
+            return 
procResult.setFailResult(ErrorCode.HTTP_ASYNC_OFFER_EXCEPTION, ex.getMessage());
+        }
+    }
+
+    /**
+     * send message to remote nodes
+     */
+    public boolean sendMessage(HttpEventInfo httpEvent, ProcessResult 
procResult) {
+        if (this.shutDown.get()) {
+            return procResult.setFailResult(ErrorCode.SDK_CLOSED);
+        }
+        List<String> curNodes = this.activeNodes;
+        int curNodeSize = curNodes.size();
+        if (curNodeSize == 0) {
+            return procResult.setFailResult(ErrorCode.EMPTY_ACTIVE_NODE_SET);
+        }
+        String curNode;
+        HostInfo hostInfo;
+        Long lstFailTime;
+        int nullNodeCnt = 0;
+        HostInfo back1thNode = null;
+        long nodeSelectTime = System.currentTimeMillis();
+        int startPos = reqSendIndex.getAndIncrement();
+        for (int index = 0; index < curNodeSize; index++) {
+            curNode = curNodes.get(Math.abs(startPos++) % curNodeSize);
+            hostInfo = usingNodeMaps.get(curNode);
+            if (hostInfo == null) {
+                nullNodeCnt++;
+                continue;
+            }
+            lstFailTime = connFailNodeMap.get(hostInfo.getReferenceName());
+            if (lstFailTime != null) {
+                if (nodeSelectTime - lstFailTime <= 
httpConfig.getHttpNodeReuseWaitIfFailMs()) {
+                    back1thNode = hostInfo;
+                    continue;
+                }
+                connFailNodeMap.remove(hostInfo.getReferenceName(), 
lstFailTime);
+            }
+            return innSendMsgByHttp(httpEvent, hostInfo, procResult);
+        }
+        if (nullNodeCnt == curNodeSize) {
+            return procResult.setFailResult(ErrorCode.EMPTY_ACTIVE_NODE_SET);
+        }
+        if (back1thNode != null) {
+            return innSendMsgByHttp(httpEvent, back1thNode, procResult);
+        }
+        return procResult.setFailResult(ErrorCode.NO_VALID_REMOTE_NODE);
+    }
+
+    /**
+     * send request to DataProxy over http
+     */
+    private boolean innSendMsgByHttp(HttpEventInfo httpEvent, HostInfo 
hostInfo, ProcessResult procResult) {
+        String rmtRptUrl = (httpConfig.isRptDataByHttps() ? 
SdkConsts.PREFIX_HTTPS : SdkConsts.PREFIX_HTTP)
+                + hostInfo.getReferenceName()
+                + SdkConsts.DATAPROXY_REPORT_METHOD;
+        if (!buildFormUrlPost(rmtRptUrl, httpEvent, procResult)) {
+            return false;
+        }
+        HttpPost httpPost = (HttpPost) procResult.getRetData();
+        CloseableHttpResponse response = null;
+        try {
+            response = httpClient.execute(httpPost);
+            String returnStr = EntityUtils.toString(response.getEntity());
+            int returnCode = response.getStatusLine().getStatusCode();
+            if (HttpStatus.SC_OK != returnCode) {
+                if (sendMsgExptCnt.shouldPrint()) {
+                    logger.warn("ClientMgr({}) report event failure, 
errCode={}, returnStr={}",
+                            this.sender.getSenderId(), returnCode, returnStr);
+                }
+                if (response.getStatusLine().getStatusCode() >= 500) {
+                    this.connFailNodeMap.put(hostInfo.getReferenceName(), 
System.currentTimeMillis());
+                }
+                return procResult.setFailResult(ErrorCode.RMT_RETURN_FAILURE,
+                        response.getStatusLine().getStatusCode() + ":" + 
returnStr);
+            }
+            if (StringUtils.isBlank(returnStr)) {
+                return 
procResult.setFailResult(ErrorCode.RMT_RETURN_BLANK_CONTENT);
+            }
+            if (logger.isDebugEnabled()) {
+                logger.debug("success to report event, url={}, result={}",
+                        rmtRptUrl, returnStr);
+            }
+            JsonObject jsonResponse = 
JsonParser.parseString(returnStr).getAsJsonObject();
+            JsonElement codeElement = jsonResponse.get("code");
+            JsonElement msgElement = jsonResponse.get("msg");
+            if (codeElement != null) {
+                int errCode = codeElement.getAsInt();
+                if (errCode == DataProxyErrCode.SUCCESS.getErrCode()) {
+                    return procResult.setSuccess();
+                } else {
+                    return 
procResult.setFailResult(ErrorCode.DP_RETURN_FAILURE,
+                            errCode + ":" + (msgElement != null ? 
msgElement.getAsString() : ""));
+                }
+            }
+            return procResult.setFailResult(ErrorCode.DP_RETURN_UNKNOWN_ERROR, 
returnStr);
+        } catch (Throwable ex) {
+            if (sendMsgExptCnt.shouldPrint()) {
+                logger.warn("ClientMgr({}) report event exception, url={}",
+                        this.sender.getSenderId(), rmtRptUrl, ex);
+            }
+            return procResult.setFailResult(ErrorCode.HTTP_VISIT_DP_EXCEPTION, 
ex.getMessage());
+        } finally {
+            if (httpPost != null) {
+                httpPost.releaseConnection();
+            }
+            if (response != null) {
+                try {
+                    response.close();
+                } catch (Throwable ex) {
+                    if (sendMsgExptCnt.shouldPrint()) {
+                        logger.warn("ClientMgr({}) close response exception, 
url={}",
+                                this.sender.getSenderId(), rmtRptUrl, ex);
+                    }
+                }
+            }
+        }
+    }
+
+    private boolean buildFormUrlPost(
+            String rmtRptUrl, HttpEventInfo httpEvent, ProcessResult 
procResult) {
+        ArrayList<BasicNameValuePair> contents = new ArrayList<>();
+        try {
+            HttpPost httpPost = new HttpPost(rmtRptUrl);
+            httpPost.setHeader(HttpHeaders.CONNECTION,
+                    HttpHeaderValues.CLOSE.toString());
+            httpPost.setHeader(HttpHeaders.CONTENT_TYPE,
+                    
HttpHeaderValues.APPLICATION_X_WWW_FORM_URLENCODED.toString());
+            contents.add(new BasicNameValuePair(AttributeConstants.GROUP_ID,
+                    httpEvent.getGroupId()));
+            contents.add(new BasicNameValuePair(AttributeConstants.STREAM_ID,
+                    httpEvent.getStreamId()));
+            contents.add(new BasicNameValuePair(AttributeConstants.DATA_TIME,
+                    String.valueOf(httpEvent.getDtMs())));
+            contents.add(new BasicNameValuePair(SdkConsts.KEY_HTTP_FIELD_BODY,
+                    StringUtils.join(httpEvent.getBodyList(), 
httpConfig.getHttpEventsSeparator())));
+            contents.add(new 
BasicNameValuePair(AttributeConstants.MESSAGE_COUNT,
+                    String.valueOf(httpEvent.getMsgCnt())));
+            if (!httpConfig.isSepEventByLF()) {
+                contents.add(new 
BasicNameValuePair(SdkConsts.KEY_HTTP_FIELD_DELIMITER,
+                        httpConfig.getHttpEventsSeparator()));
+            }
+            String encodedContents = URLEncodedUtils.format(contents, 
StandardCharsets.UTF_8);
+            httpPost.setEntity(new StringEntity(encodedContents));
+            if (logger.isDebugEnabled()) {
+                logger.debug("begin to post request to {}, encoded content is: 
{}",
+                        rmtRptUrl, encodedContents);
+            }
+            return procResult.setSuccess(httpPost);
+        } catch (Throwable ex) {
+            if (sendMsgExptCnt.shouldPrint()) {
+                logger.warn("ClientMgr({}) build form-url content failure, 
content={}",
+                        this.sender.getSenderId(), contents, ex);
+            }
+            return 
procResult.setFailResult(ErrorCode.BUILD_FORM_CONTENT_EXCEPTION, 
ex.getMessage());
+        }
+    }
+
+    /**
+     * check cache runner
+     */
+    private class HttpAsyncReportWorker implements Runnable {
+
+        private final String workerId;
+
+        public HttpAsyncReportWorker(int workerId) {
+            this.workerId = sender.getSenderId() + "-" + workerId;
+        }
+
+        @Override
+        public void run() {
+            long curTime = 0;
+            HttpAsyncObj asyncObj;
+            ProcessResult procResult = new ProcessResult();
+            logger.info("HttpAsyncReportWorker({}) started", this.workerId);
+            // if not shutdown or queue is not empty
+            while (!shutDown.get() || !messageCache.isEmpty()) {
+                while (!messageCache.isEmpty()) {
+                    asyncObj = messageCache.poll();
+                    if (asyncObj == null) {
+                        continue;
+                    }
+                    try {
+                        sendMessage(asyncObj.getHttpEvent(), procResult);
+                        curTime = System.currentTimeMillis();
+                        asyncObj.getCallback().onMessageAck(procResult);
+                    } catch (Throwable ex) {
+                        if (asyncSendExptCnt.shouldPrint()) {
+                            logger.error("HttpAsync({}) report event 
exception", workerId, ex);
+                        }
+                    } finally {
+                        asyncIdleCellCnt.release();
+                    }
+                }
+                
ProxyUtils.sleepSomeTime(httpConfig.getHttpAsyncWorkerIdleWaitMs());
+            }
+            logger.info("HttpAsyncReportWorker({}) stopped", this.workerId);
+        }
+    }
+}
\ No newline at end of file
diff --git 
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/common/HttpContentType.java
 
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/http/HttpContentType.java
similarity index 94%
rename from 
inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/common/HttpContentType.java
rename to 
inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/http/HttpContentType.java
index 0db9261c70..9020b6b7d6 100644
--- 
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/common/HttpContentType.java
+++ 
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/http/HttpContentType.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.inlong.sdk.dataproxy.common;
+package org.apache.inlong.sdk.dataproxy.network.http;
 
 /**
  * HTTP Report Content Type enum
diff --git 
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/sender/BaseSender.java
 
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/sender/BaseSender.java
index c103bd31f4..3b6485eac3 100644
--- 
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/sender/BaseSender.java
+++ 
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/sender/BaseSender.java
@@ -83,6 +83,8 @@ public abstract class BaseSender implements ConfigHolder {
         this.senderFactory = senderFactory;
         this.factoryClusterIdKey = clusterIdKey;
         this.senderId = configure.getDataRptProtocol() + "-" + 
senderIdGen.incrementAndGet();
+        this.configManager = new ProxyConfigManager(this.senderId, 
this.baseConfig, this);
+        this.configManager.setDaemon(true);
     }
 
     public boolean start(ProcessResult procResult) {
diff --git 
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/sender/http/HttpMsgSenderConfig.java
 
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/sender/http/HttpMsgSenderConfig.java
index 6c7e5ac8b0..d05f4c821e 100644
--- 
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/sender/http/HttpMsgSenderConfig.java
+++ 
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/sender/http/HttpMsgSenderConfig.java
@@ -18,11 +18,11 @@
 package org.apache.inlong.sdk.dataproxy.sender.http;
 
 import org.apache.inlong.common.msg.AttributeConstants;
-import org.apache.inlong.sdk.dataproxy.common.HttpContentType;
 import org.apache.inlong.sdk.dataproxy.common.ProxyClientConfig;
 import org.apache.inlong.sdk.dataproxy.common.ReportProtocol;
 import org.apache.inlong.sdk.dataproxy.common.SdkConsts;
 import org.apache.inlong.sdk.dataproxy.exception.ProxySdkException;
+import org.apache.inlong.sdk.dataproxy.network.http.HttpContentType;
 
 import org.apache.commons.lang3.StringUtils;
 
diff --git 
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/sender/http/InLongHttpMsgSender.java
 
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/sender/http/InLongHttpMsgSender.java
new file mode 100644
index 0000000000..e7d0b44a34
--- /dev/null
+++ 
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/sender/http/InLongHttpMsgSender.java
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sdk.dataproxy.sender.http;
+
+import org.apache.inlong.sdk.dataproxy.MsgSenderFactory;
+import org.apache.inlong.sdk.dataproxy.common.ErrorCode;
+import org.apache.inlong.sdk.dataproxy.common.ProcessResult;
+import org.apache.inlong.sdk.dataproxy.common.SdkConsts;
+import org.apache.inlong.sdk.dataproxy.network.http.HttpAsyncObj;
+import org.apache.inlong.sdk.dataproxy.network.http.HttpClientMgr;
+import org.apache.inlong.sdk.dataproxy.sender.BaseSender;
+import org.apache.inlong.sdk.dataproxy.sender.MsgSendCallback;
+import org.apache.inlong.sdk.dataproxy.utils.LogCounter;
+
+/**
+ * HTTP(s) Message Sender class
+ *
+ * Used to define the HTTP(s) sender common methods
+ */
+public class InLongHttpMsgSender extends BaseSender implements HttpMsgSender {
+
+    protected static final LogCounter httpExceptCnt = new LogCounter(10, 
100000, 60 * 1000L);
+    private final HttpClientMgr httpClientMgr;
+    private final HttpMsgSenderConfig httpConfig;
+
+    public InLongHttpMsgSender(HttpMsgSenderConfig configure) {
+        this(configure, null, null);
+    }
+
+    public InLongHttpMsgSender(HttpMsgSenderConfig configure, MsgSenderFactory 
senderFactory, String clusterIdKey) {
+        super(configure, senderFactory, clusterIdKey);
+        this.httpConfig = (HttpMsgSenderConfig) baseConfig;
+        this.clientMgr = new HttpClientMgr(this, this.httpConfig);
+        this.httpClientMgr = (HttpClientMgr) clientMgr;
+    }
+
+    @Override
+    public boolean syncSendMessage(HttpEventInfo eventInfo, ProcessResult 
procResult) {
+        validParamsNotNull(eventInfo, procResult);
+        if (!this.isStarted()) {
+            return procResult.setFailResult(ErrorCode.SDK_CLOSED);
+        }
+        if (this.isMetaInfoUnReady()) {
+            return procResult.setFailResult(ErrorCode.NO_NODE_META_INFOS);
+        }
+        // check package length
+        if (!isValidPkgLength(eventInfo, this.getAllowedPkgLength(), 
procResult)) {
+            return false;
+        }
+        return httpClientMgr.sendMessage(eventInfo, procResult);
+    }
+
+    @Override
+    public boolean asyncSendMessage(HttpEventInfo eventInfo, MsgSendCallback 
callback, ProcessResult procResult) {
+        validParamsNotNull(eventInfo, callback, procResult);
+        if (!this.isStarted()) {
+            return procResult.setFailResult(ErrorCode.SDK_CLOSED);
+        }
+        if (this.isMetaInfoUnReady()) {
+            return procResult.setFailResult(ErrorCode.NO_NODE_META_INFOS);
+        }
+        // check package length
+        if (!isValidPkgLength(eventInfo, this.getAllowedPkgLength(), 
procResult)) {
+            return false;
+        }
+        return httpClientMgr.asyncSendMessage(new HttpAsyncObj(eventInfo, 
callback), procResult);
+    }
+
+    @Override
+    public int getActiveNodeCnt() {
+        return httpClientMgr.getActiveNodeCnt();
+    }
+
+    @Override
+    public int getInflightMsgCnt() {
+        return httpClientMgr.getInflightMsgCnt();
+    }
+
+    private boolean isValidPkgLength(HttpEventInfo eventInfo, int allowedLen, 
ProcessResult procResult) {
+        // Not valid if the maximum limit is less than or equal to 0
+        if (allowedLen < 0) {
+            return true;
+        }
+        int eventLen = eventInfo.getBodySize()
+                + eventInfo.getGroupId().length()
+                + eventInfo.getStreamId().length()
+                + String.valueOf(eventInfo.getDtMs()).length();
+        // Reserve space for attribute
+        if (eventLen > allowedLen - SdkConsts.RESERVED_ATTRIBUTE_LENGTH) {
+            String errMsg = String.format("OverMaxLen: content length(%d) > 
allowedLen(%d) - fixedLen(%d)",
+                    eventLen, allowedLen, SdkConsts.RESERVED_ATTRIBUTE_LENGTH);
+            if (httpExceptCnt.shouldPrint()) {
+                logger.warn(errMsg);
+            }
+            return 
procResult.setFailResult(ErrorCode.REPORT_INFO_EXCEED_MAX_LEN, errMsg);
+        }
+        return true;
+    }
+
+    private void validParamsNotNull(HttpEventInfo eventInfo, ProcessResult 
procResult) {
+        if (eventInfo == null) {
+            throw new NullPointerException("eventInfo is null");
+        }
+        if (procResult == null) {
+            throw new NullPointerException("procResult is null");
+        }
+    }
+
+    private void validParamsNotNull(HttpEventInfo eventInfo, MsgSendCallback 
callback, ProcessResult procResult) {
+        if (eventInfo == null) {
+            throw new NullPointerException("eventInfo is null");
+        }
+        if (callback == null) {
+            throw new NullPointerException("callback is null");
+        }
+        if (procResult == null) {
+            throw new NullPointerException("procResult is null");
+        }
+    }
+
+}
diff --git 
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/utils/HttpUtils.java
 
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/utils/HttpUtils.java
new file mode 100644
index 0000000000..228ee59a24
--- /dev/null
+++ 
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/utils/HttpUtils.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sdk.dataproxy.utils;
+
+import org.apache.inlong.sdk.dataproxy.common.ErrorCode;
+import org.apache.inlong.sdk.dataproxy.common.ProcessResult;
+
+import org.apache.http.client.config.RequestConfig;
+import org.apache.http.conn.ssl.SSLConnectionSocketFactory;
+import org.apache.http.impl.client.CloseableHttpClient;
+import org.apache.http.impl.client.HttpClientBuilder;
+import org.apache.http.impl.client.HttpClients;
+import org.apache.http.ssl.SSLContexts;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.net.ssl.SSLContext;
+
+/**
+ * Http(s) Utils class
+ *
+ * Used to place public processing functions related to HTTP(s)
+ */
+public class HttpUtils {
+
+    private static final Logger logger = 
LoggerFactory.getLogger(HttpUtils.class);
+    private static final LogCounter exceptCnt = new LogCounter(10, 200000, 60 
* 1000L);
+
+    public static boolean constructHttpClient(boolean rptByHttps,
+            int socketTimeoutMs, int conTimeoutMs, String tlsVer, 
ProcessResult procResult) {
+        CloseableHttpClient httpClient;
+        RequestConfig requestConfig = RequestConfig.custom()
+                .setSocketTimeout(socketTimeoutMs)
+                .setConnectTimeout(conTimeoutMs).build();
+        try {
+            if (rptByHttps) {
+                SSLContext sslContext = SSLContexts.custom().build();
+                SSLConnectionSocketFactory sslSf = new 
SSLConnectionSocketFactory(sslContext,
+                        new String[]{tlsVer}, null,
+                        
SSLConnectionSocketFactory.getDefaultHostnameVerifier());
+                httpClient = HttpClients.custom()
+                        .setDefaultRequestConfig(requestConfig)
+                        .setSSLSocketFactory(sslSf).build();
+            } else {
+                httpClient = HttpClientBuilder.create()
+                        .setDefaultRequestConfig(requestConfig).build();
+            }
+            return procResult.setSuccess(httpClient);
+        } catch (Throwable ex) {
+            if (exceptCnt.shouldPrint()) {
+                logger.error("Build http client exception", ex);
+            }
+            return 
procResult.setFailResult(ErrorCode.HTTP_BUILD_CLIENT_EXCEPTION, 
ex.getMessage());
+        }
+    }
+}

Reply via email to