This is an automated email from the ASF dual-hosted git repository.
gosonzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new ea0c20407a [INLONG-11706][SDK] Optimize HTTP Sender implementation
(#11707)
ea0c20407a is described below
commit ea0c20407aceaeb0fb7b1d3a985871d209496cac
Author: Goson Zhang <[email protected]>
AuthorDate: Wed Jan 22 17:49:09 2025 +0800
[INLONG-11706][SDK] Optimize HTTP Sender implementation (#11707)
* [INLONG-11706][SDK] Optimize HTTP Sender implementation
* [INLONG-11706][SDK] Optimize HTTP Sender implementation
---------
Co-authored-by: gosonzhang <[email protected]>
---
.../inlong/sdk/dataproxy/BaseMsgSenderFactory.java | 112 ++++-
.../inlong/sdk/dataproxy/MsgSenderFactory.java | 20 +
.../sdk/dataproxy/MsgSenderMultiFactory.java | 21 +-
.../sdk/dataproxy/MsgSenderSingleFactory.java | 20 +-
.../inlong/sdk/dataproxy/common/ErrorCode.java | 10 +
.../inlong/sdk/dataproxy/common/SdkConsts.java | 4 +
.../dataproxy/example/InLongFactoryExample.java | 38 ++
.../dataproxy/example/InLongHttpClientExample.java | 73 ++++
.../sdk/dataproxy/http/InternalHttpSender.java | 3 +-
.../sdk/dataproxy/network/HttpProxySender.java | 2 +
.../sdk/dataproxy/network/http/HttpAsyncObj.java | 51 +++
.../sdk/dataproxy/network/http/HttpClientMgr.java | 461 +++++++++++++++++++++
.../{common => network/http}/HttpContentType.java | 2 +-
.../inlong/sdk/dataproxy/sender/BaseSender.java | 2 +
.../dataproxy/sender/http/HttpMsgSenderConfig.java | 2 +-
.../dataproxy/sender/http/InLongHttpMsgSender.java | 136 ++++++
.../inlong/sdk/dataproxy/utils/HttpUtils.java | 71 ++++
17 files changed, 1010 insertions(+), 18 deletions(-)
diff --git
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/BaseMsgSenderFactory.java
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/BaseMsgSenderFactory.java
index 0bc0cf4bb9..50fa183c58 100644
---
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/BaseMsgSenderFactory.java
+++
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/BaseMsgSenderFactory.java
@@ -23,6 +23,8 @@ import
org.apache.inlong.sdk.dataproxy.config.ProxyConfigEntry;
import org.apache.inlong.sdk.dataproxy.config.ProxyConfigManager;
import org.apache.inlong.sdk.dataproxy.exception.ProxySdkException;
import org.apache.inlong.sdk.dataproxy.sender.BaseSender;
+import org.apache.inlong.sdk.dataproxy.sender.http.HttpMsgSenderConfig;
+import org.apache.inlong.sdk.dataproxy.sender.http.InLongHttpMsgSender;
import org.apache.inlong.sdk.dataproxy.sender.tcp.InLongTcpMsgSender;
import org.apache.inlong.sdk.dataproxy.sender.tcp.TcpMsgSenderConfig;
import org.apache.inlong.sdk.dataproxy.utils.LogCounter;
@@ -69,9 +71,9 @@ public class BaseMsgSenderFactory {
senderCacheLock.writeLock().lock();
try {
// release groupId mapped senders
- totalSenderCnt = innReleaseAllGroupIdSenders(groupIdSenderMap);
+ totalSenderCnt = releaseAllGroupIdSenders(groupIdSenderMap);
// release clusterId mapped senders
- totalSenderCnt +=
innReleaseAllClusterIdSenders(clusterIdSenderMap);
+ totalSenderCnt += releaseAllClusterIdSenders(clusterIdSenderMap);
} finally {
senderCacheLock.writeLock().unlock();
}
@@ -90,9 +92,9 @@ public class BaseMsgSenderFactory {
senderCacheLock.writeLock().lock();
try {
if (msgSender.getFactoryClusterIdKey() == null) {
- removed = innRemoveGroupIdSender(msgSender, groupIdSenderMap);
+ removed = removeGroupIdSender(msgSender, groupIdSenderMap);
} else {
- removed = innRemoveClusterIdSender(msgSender,
clusterIdSenderMap);
+ removed = removeClusterIdSender(msgSender, clusterIdSenderMap);
}
} finally {
senderCacheLock.writeLock().unlock();
@@ -108,7 +110,7 @@ public class BaseMsgSenderFactory {
public InLongTcpMsgSender genTcpSenderByGroupId(
TcpMsgSenderConfig configure, ThreadFactory selfDefineFactory)
throws ProxySdkException {
- ProxyUtils.validProxyConfigNotNull(configure);
+ validProxyConfigNotNull(configure);
// query cached sender
String metaConfigKey = configure.getGroupMetaConfigKey();
InLongTcpMsgSender messageSender =
@@ -148,9 +150,51 @@ public class BaseMsgSenderFactory {
}
}
+ public InLongHttpMsgSender genHttpSenderByGroupId(
+ HttpMsgSenderConfig configure) throws ProxySdkException {
+ validProxyConfigNotNull(configure);
+ // query cached sender
+ String metaConfigKey = configure.getGroupMetaConfigKey();
+ InLongHttpMsgSender messageSender =
+ (InLongHttpMsgSender) groupIdSenderMap.get(metaConfigKey);
+ if (messageSender != null) {
+ return messageSender;
+ }
+ // valid configure info
+ ProcessResult procResult = new ProcessResult();
+ qryProxyMetaConfigure(configure, procResult);
+ // generate sender
+ senderCacheLock.writeLock().lock();
+ try {
+ // re-get the created sender based on the groupId key after locked
+ messageSender = (InLongHttpMsgSender)
groupIdSenderMap.get(metaConfigKey);
+ if (messageSender != null) {
+ return messageSender;
+ }
+ // build a new sender based on groupId
+ messageSender = new InLongHttpMsgSender(configure,
msgSenderFactory, null);
+ if (!messageSender.start(procResult)) {
+ messageSender.close();
+ throw new ProxySdkException("Failed to start groupId sender: "
+ procResult);
+ }
+ groupIdSenderMap.put(metaConfigKey, messageSender);
+ logger.info("MsgSenderFactory({}) generated a new groupId({})
sender({})",
+ this.factoryNo, metaConfigKey,
messageSender.getSenderId());
+ return messageSender;
+ } catch (Throwable ex) {
+ if (exptCounter.shouldPrint()) {
+ logger.warn("MsgSenderFactory({}) build groupId sender({})
exception",
+ this.factoryNo, metaConfigKey, ex);
+ }
+ throw new ProxySdkException("Failed to build groupId sender: " +
ex.getMessage());
+ } finally {
+ senderCacheLock.writeLock().unlock();
+ }
+ }
+
public InLongTcpMsgSender genTcpSenderByClusterId(
TcpMsgSenderConfig configure, ThreadFactory selfDefineFactory)
throws ProxySdkException {
- ProxyUtils.validProxyConfigNotNull(configure);
+ validProxyConfigNotNull(configure);
// get groupId's clusterIdKey
ProcessResult procResult = new ProcessResult();
ProxyConfigEntry proxyConfigEntry = qryProxyMetaConfigure(configure,
procResult);;
@@ -191,6 +235,48 @@ public class BaseMsgSenderFactory {
}
}
+ public InLongHttpMsgSender genHttpSenderByClusterId(
+ HttpMsgSenderConfig configure) throws ProxySdkException {
+ validProxyConfigNotNull(configure);
+ // get groupId's clusterIdKey
+ ProcessResult procResult = new ProcessResult();
+ ProxyConfigEntry proxyConfigEntry = qryProxyMetaConfigure(configure,
procResult);;
+ String clusterIdKey = ProxyUtils.buildClusterIdKey(
+ configure.getDataRptProtocol(), configure.getRegionName(),
proxyConfigEntry.getClusterId());
+ // get local built sender
+ InLongHttpMsgSender messageSender = (InLongHttpMsgSender)
clusterIdSenderMap.get(clusterIdKey);
+ if (messageSender != null) {
+ return messageSender;
+ }
+ // generate sender
+ senderCacheLock.writeLock().lock();
+ try {
+ // re-get the created sender based on the clusterId Key after
locked
+ messageSender = (InLongHttpMsgSender)
clusterIdSenderMap.get(clusterIdKey);
+ if (messageSender != null) {
+ return messageSender;
+ }
+ // build a new sender based on clusterId Key
+ messageSender = new InLongHttpMsgSender(configure,
msgSenderFactory, clusterIdKey);
+ if (!messageSender.start(procResult)) {
+ messageSender.close();
+ throw new ProxySdkException("Failed to start cluster sender: "
+ procResult);
+ }
+ clusterIdSenderMap.put(clusterIdKey, messageSender);
+ logger.info("MsgSenderFactory({}) generated a new clusterId({})
sender({})",
+ this.factoryNo, clusterIdKey, messageSender.getSenderId());
+ return messageSender;
+ } catch (Throwable ex) {
+ if (exptCounter.shouldPrint()) {
+ logger.warn("MsgSenderFactory({}) build cluster sender({})
exception",
+ this.factoryNo, clusterIdKey, ex);
+ }
+ throw new ProxySdkException("Failed to build cluster sender: " +
ex.getMessage());
+ } finally {
+ senderCacheLock.writeLock().unlock();
+ }
+ }
+
private ProxyConfigEntry qryProxyMetaConfigure(
ProxyClientConfig proxyConfig, ProcessResult procResult) throws
ProxySdkException {
ProxyConfigManager inlongMetaQryMgr = new
ProxyConfigManager(proxyConfig);
@@ -205,7 +291,7 @@ public class BaseMsgSenderFactory {
return inlongMetaQryMgr.getProxyConfigEntry();
}
- private boolean innRemoveGroupIdSender(BaseSender msgSender, Map<String,
BaseSender> senderMap) {
+ private boolean removeGroupIdSender(BaseSender msgSender, Map<String,
BaseSender> senderMap) {
BaseSender tmpSender = senderMap.get(msgSender.getMetaConfigKey());
if (tmpSender == null
|| !tmpSender.getSenderId().equals(msgSender.getSenderId())) {
@@ -214,7 +300,7 @@ public class BaseMsgSenderFactory {
return senderMap.remove(msgSender.getMetaConfigKey()) != null;
}
- private boolean innRemoveClusterIdSender(BaseSender msgSender, Map<String,
BaseSender> senderMap) {
+ private boolean removeClusterIdSender(BaseSender msgSender, Map<String,
BaseSender> senderMap) {
BaseSender tmpSender =
senderMap.get(msgSender.getFactoryClusterIdKey());
if (tmpSender == null
|| !tmpSender.getSenderId().equals(msgSender.getSenderId())) {
@@ -223,7 +309,7 @@ public class BaseMsgSenderFactory {
return senderMap.remove(msgSender.getFactoryClusterIdKey()) != null;
}
- private int innReleaseAllGroupIdSenders(Map<String, BaseSender> senderMap)
{
+ private int releaseAllGroupIdSenders(Map<String, BaseSender> senderMap) {
int totalSenderCnt = 0;
for (Map.Entry<String, BaseSender> entry : senderMap.entrySet()) {
if (entry == null || entry.getValue() == null) {
@@ -243,7 +329,7 @@ public class BaseMsgSenderFactory {
return totalSenderCnt;
}
- private int innReleaseAllClusterIdSenders(Map<String, BaseSender>
senderMap) {
+ private int releaseAllClusterIdSenders(Map<String, BaseSender> senderMap) {
int totalSenderCnt = 0;
for (Map.Entry<String, BaseSender> entry : senderMap.entrySet()) {
if (entry == null
@@ -264,4 +350,10 @@ public class BaseMsgSenderFactory {
senderMap.clear();
return totalSenderCnt;
}
+
+ private void validProxyConfigNotNull(ProxyClientConfig configure) throws
ProxySdkException {
+ if (configure == null) {
+ throw new ProxySdkException("configure is null!");
+ }
+ }
}
diff --git
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/MsgSenderFactory.java
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/MsgSenderFactory.java
index a6a4e20b4c..d2169d7152 100644
---
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/MsgSenderFactory.java
+++
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/MsgSenderFactory.java
@@ -19,6 +19,8 @@ package org.apache.inlong.sdk.dataproxy;
import org.apache.inlong.sdk.dataproxy.exception.ProxySdkException;
import org.apache.inlong.sdk.dataproxy.sender.BaseSender;
+import org.apache.inlong.sdk.dataproxy.sender.http.HttpMsgSenderConfig;
+import org.apache.inlong.sdk.dataproxy.sender.http.InLongHttpMsgSender;
import org.apache.inlong.sdk.dataproxy.sender.tcp.InLongTcpMsgSender;
import org.apache.inlong.sdk.dataproxy.sender.tcp.TcpMsgSenderConfig;
@@ -88,4 +90,22 @@ public interface MsgSenderFactory {
*/
InLongTcpMsgSender genTcpSenderByClusterId(
TcpMsgSenderConfig configure, ThreadFactory selfDefineFactory)
throws ProxySdkException;
+
+ /**
+ * Get or generate a http sender from the factory according to groupId
+ *
+ * @param configure the sender configure
+ * @return the sender
+ */
+ InLongHttpMsgSender genHttpSenderByGroupId(
+ HttpMsgSenderConfig configure) throws ProxySdkException;
+
+ /**
+ * Get or generate a http sender from the factory according to clusterId
+ *
+ * @param configure the sender configure
+ * @return the sender
+ */
+ InLongHttpMsgSender genHttpSenderByClusterId(
+ HttpMsgSenderConfig configure) throws ProxySdkException;
}
diff --git
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/MsgSenderMultiFactory.java
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/MsgSenderMultiFactory.java
index 0cb595c261..f42d8b4d61 100644
---
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/MsgSenderMultiFactory.java
+++
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/MsgSenderMultiFactory.java
@@ -19,9 +19,10 @@ package org.apache.inlong.sdk.dataproxy;
import org.apache.inlong.sdk.dataproxy.exception.ProxySdkException;
import org.apache.inlong.sdk.dataproxy.sender.BaseSender;
+import org.apache.inlong.sdk.dataproxy.sender.http.HttpMsgSenderConfig;
+import org.apache.inlong.sdk.dataproxy.sender.http.InLongHttpMsgSender;
import org.apache.inlong.sdk.dataproxy.sender.tcp.InLongTcpMsgSender;
import org.apache.inlong.sdk.dataproxy.sender.tcp.TcpMsgSenderConfig;
-import org.apache.inlong.sdk.dataproxy.utils.ProxyUtils;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -79,7 +80,6 @@ public class MsgSenderMultiFactory implements
MsgSenderFactory {
if (!this.initialized.get()) {
throw new ProxySdkException("Please initialize the factory
first!");
}
- ProxyUtils.validProxyConfigNotNull(configure);
return this.baseMsgSenderFactory.genTcpSenderByGroupId(configure,
selfDefineFactory);
}
@@ -95,7 +95,22 @@ public class MsgSenderMultiFactory implements
MsgSenderFactory {
if (!this.initialized.get()) {
throw new ProxySdkException("Please initialize the factory
first!");
}
- ProxyUtils.validProxyConfigNotNull(configure);
return this.baseMsgSenderFactory.genTcpSenderByClusterId(configure,
selfDefineFactory);
}
+
+ @Override
+ public InLongHttpMsgSender genHttpSenderByGroupId(HttpMsgSenderConfig
configure) throws ProxySdkException {
+ if (!this.initialized.get()) {
+ throw new ProxySdkException("Please initialize the factory
first!");
+ }
+ return this.baseMsgSenderFactory.genHttpSenderByGroupId(configure);
+ }
+
+ @Override
+ public InLongHttpMsgSender genHttpSenderByClusterId(HttpMsgSenderConfig
configure) throws ProxySdkException {
+ if (!this.initialized.get()) {
+ throw new ProxySdkException("Please initialize the factory
first!");
+ }
+ return this.baseMsgSenderFactory.genHttpSenderByClusterId(configure);
+ }
}
diff --git
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/MsgSenderSingleFactory.java
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/MsgSenderSingleFactory.java
index 91b1735155..ad891c971b 100644
---
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/MsgSenderSingleFactory.java
+++
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/MsgSenderSingleFactory.java
@@ -19,6 +19,8 @@ package org.apache.inlong.sdk.dataproxy;
import org.apache.inlong.sdk.dataproxy.exception.ProxySdkException;
import org.apache.inlong.sdk.dataproxy.sender.BaseSender;
+import org.apache.inlong.sdk.dataproxy.sender.http.HttpMsgSenderConfig;
+import org.apache.inlong.sdk.dataproxy.sender.http.InLongHttpMsgSender;
import org.apache.inlong.sdk.dataproxy.sender.tcp.InLongTcpMsgSender;
import org.apache.inlong.sdk.dataproxy.sender.tcp.TcpMsgSenderConfig;
import org.apache.inlong.sdk.dataproxy.utils.ProxyUtils;
@@ -86,7 +88,6 @@ public class MsgSenderSingleFactory implements
MsgSenderFactory {
if (!initialized.get()) {
throw new ProxySdkException("Please initialize the factory
first!");
}
- ProxyUtils.validProxyConfigNotNull(configure);
return baseMsgSenderFactory.genTcpSenderByGroupId(configure,
selfDefineFactory);
}
@@ -102,7 +103,22 @@ public class MsgSenderSingleFactory implements
MsgSenderFactory {
if (!initialized.get()) {
throw new ProxySdkException("Please initialize the factory
first!");
}
- ProxyUtils.validProxyConfigNotNull(configure);
return baseMsgSenderFactory.genTcpSenderByClusterId(configure,
selfDefineFactory);
}
+
+ @Override
+ public InLongHttpMsgSender genHttpSenderByGroupId(HttpMsgSenderConfig
configure) throws ProxySdkException {
+ if (!initialized.get()) {
+ throw new ProxySdkException("Please initialize the factory
first!");
+ }
+ return baseMsgSenderFactory.genHttpSenderByGroupId(configure);
+ }
+
+ @Override
+ public InLongHttpMsgSender genHttpSenderByClusterId(HttpMsgSenderConfig
configure) throws ProxySdkException {
+ if (!initialized.get()) {
+ throw new ProxySdkException("Please initialize the factory
first!");
+ }
+ return baseMsgSenderFactory.genHttpSenderByClusterId(configure);
+ }
}
diff --git
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/common/ErrorCode.java
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/common/ErrorCode.java
index 187a1b56b9..b1dd6f26c3 100644
---
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/common/ErrorCode.java
+++
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/common/ErrorCode.java
@@ -93,6 +93,16 @@ public enum ErrorCode {
DP_UNCONFIGURED_GROUPID_OR_STREAMID(155, "DataProxy return unconfigured
groupId or streamId"),
//
DP_RECEIVE_FAILURE(160, "DataProxy return message receive failure"),
+ //
+ HTTP_ASYNC_POOL_FULL(171, "Http async pool full"),
+ HTTP_ASYNC_OFFER_FAIL(172, "Http async offer event fail"),
+ HTTP_ASYNC_OFFER_EXCEPTION(173, "Http async offer event exception"),
+ HTTP_BUILD_CLIENT_EXCEPTION(174, "Http build client exception"),
+ //
+ BUILD_FORM_CONTENT_EXCEPTION(181, "Build form content exception"),
+ DP_RETURN_FAILURE(182, "DataProxy return failure"),
+ HTTP_VISIT_DP_EXCEPTION(183, "Http visit exception"),
+ DP_RETURN_UNKNOWN_ERROR(184, "DataProxy return unknown error"),
UNKNOWN_ERROR(9999, "Unknown error");
diff --git
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/common/SdkConsts.java
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/common/SdkConsts.java
index cf2d006742..948088bc8a 100644
---
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/common/SdkConsts.java
+++
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/common/SdkConsts.java
@@ -21,6 +21,8 @@ public class SdkConsts {
public static String PREFIX_HTTP = "http://";
public static String PREFIX_HTTPS = "https://";
+ public static final String KEY_HTTP_FIELD_BODY = "body";
+ public static final String KEY_HTTP_FIELD_DELIMITER = "rcdDlmtr";
// dataproxy node config
public static final String MANAGER_DATAPROXY_API =
"/inlong/manager/openapi/dataproxy/getIpList/";
@@ -32,6 +34,8 @@ public class SdkConsts {
public static final String BASIC_AUTH_HEADER = "authorization";
// default region name
public static final String VAL_DEF_REGION_NAME = "";
+ // http report method
+ public static final String DATAPROXY_REPORT_METHOD = "/dataproxy/message";
// config info sync interval in minutes
public static final int VAL_DEF_CONFIG_SYNC_INTERVAL_MIN = 3;
public static final int VAL_MIN_CONFIG_SYNC_INTERVAL_MIN = 1;
diff --git
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/example/InLongFactoryExample.java
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/example/InLongFactoryExample.java
index d1c8b8e19c..c9e438df3c 100644
---
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/example/InLongFactoryExample.java
+++
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/example/InLongFactoryExample.java
@@ -21,6 +21,8 @@ import org.apache.inlong.common.msg.MsgType;
import org.apache.inlong.sdk.dataproxy.MsgSenderMultiFactory;
import org.apache.inlong.sdk.dataproxy.MsgSenderSingleFactory;
import org.apache.inlong.sdk.dataproxy.common.ProcessResult;
+import org.apache.inlong.sdk.dataproxy.sender.http.HttpMsgSenderConfig;
+import org.apache.inlong.sdk.dataproxy.sender.http.InLongHttpMsgSender;
import org.apache.inlong.sdk.dataproxy.sender.tcp.InLongTcpMsgSender;
import org.apache.inlong.sdk.dataproxy.sender.tcp.TcpMsgSenderConfig;
import org.apache.inlong.sdk.dataproxy.utils.ProxyUtils;
@@ -74,6 +76,22 @@ public class InLongFactoryExample {
ProxyUtils.sleepSomeTime(10000L);
tcpMsgSender.close();
+ // report data by http
+ HttpMsgSenderConfig httpMsgSenderConfig = new HttpMsgSenderConfig(
+ false, managerIp, managerPort, groupId, secretId, secretKey);
+ InLongHttpMsgSender httpMsgSender =
+ singleFactory.genHttpSenderByGroupId(httpMsgSenderConfig);
+ if (!httpMsgSender.start(procResult)) {
+ System.out.println("Start http sender failure: process result=" +
procResult);
+ }
+ ExampleUtils.sendHttpMessages(httpMsgSender, false, false,
+ groupId, streamId, reqCnt, msgSize, msgCnt, procResult);
+ ExampleUtils.sendHttpMessages(httpMsgSender, false, true,
+ groupId, streamId, reqCnt, msgSize, msgCnt, procResult);
+ ProxyUtils.sleepSomeTime(10000L);
+ httpMsgSender.close();
+ System.out.println("Cur singleton factory sender count is " +
singleFactory.getMsgSenderCount());
+
// report data use multi-factory
MsgSenderMultiFactory multiFactory1 = new MsgSenderMultiFactory();
MsgSenderMultiFactory multiFactory2 = new MsgSenderMultiFactory();
@@ -104,6 +122,26 @@ public class InLongFactoryExample {
System.out.println("Multi-1.2 Cur multiFactory1 sender count = "
+ multiFactory1.getMsgSenderCount()
+ ", cur multiFactory2 sender count is " +
multiFactory2.getMsgSenderCount());
+ // report data by http
+ InLongHttpMsgSender httpMsgSender1 =
+ multiFactory1.genHttpSenderByGroupId(httpMsgSenderConfig);
+ HttpMsgSenderConfig httpConfg2 = new HttpMsgSenderConfig(false,
+ managerIp, managerPort, groupId, secretId, secretKey);
+ InLongHttpMsgSender httpMsgSender2 =
+ multiFactory2.genHttpSenderByGroupId(httpConfg2);
+ ExampleUtils.sendHttpMessages(httpMsgSender1, false, false,
+ groupId, streamId, reqCnt, msgSize, msgCnt, procResult);
+ ExampleUtils.sendHttpMessages(httpMsgSender2, false, true,
+ groupId, streamId, reqCnt, msgSize, msgCnt, procResult);
+ ProxyUtils.sleepSomeTime(10000L);
+ httpMsgSender1.close();
+ System.out.println("Multi-2.1 Cur multiFactory1 sender count = "
+ + multiFactory1.getMsgSenderCount()
+ + ", cur multiFactory2 sender count is " +
multiFactory2.getMsgSenderCount());
+ httpMsgSender2.close();
+ System.out.println("Multi-2.2 Cur multiFactory1 sender count = "
+ + multiFactory1.getMsgSenderCount()
+ + ", cur multiFactory2 sender count is " +
multiFactory2.getMsgSenderCount());
// test self DefineFactory
ThreadFactory selfDefineFactory = new
DefaultThreadFactory("test_self_thread_factory");
diff --git
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/example/InLongHttpClientExample.java
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/example/InLongHttpClientExample.java
new file mode 100644
index 0000000000..fccdac4a5c
--- /dev/null
+++
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/example/InLongHttpClientExample.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sdk.dataproxy.example;
+
+import org.apache.inlong.sdk.dataproxy.common.ProcessResult;
+import org.apache.inlong.sdk.dataproxy.sender.http.HttpMsgSenderConfig;
+import org.apache.inlong.sdk.dataproxy.sender.http.InLongHttpMsgSender;
+import org.apache.inlong.sdk.dataproxy.utils.ProxyUtils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class InLongHttpClientExample {
+
+ protected static final Logger logger =
LoggerFactory.getLogger(InLongHttpClientExample.class);
+
+ public static void main(String[] args) throws Exception {
+
+ String managerIp = args[0];
+ String managerPort = args[1];
+ String groupId = args[2];
+ String streamId = args[3];
+ String secretId = args[4];
+ String secretKey = args[5];
+ int reqCnt = Integer.parseInt(args[6]);
+ int msgSize = 1024;
+ int msgCnt = 1;
+ if (args.length > 7) {
+ msgSize = Integer.parseInt(args[7]);
+ msgCnt = Integer.parseInt(args[8]);
+ }
+
+ String managerAddr = "http://" + managerIp + ":" + managerPort;
+
+ HttpMsgSenderConfig dataProxyConfig =
+ new HttpMsgSenderConfig(managerAddr, groupId, secretId,
secretKey);
+ InLongHttpMsgSender messageSender = new
InLongHttpMsgSender(dataProxyConfig);
+
+ ProcessResult procResult = new ProcessResult();
+ if (!messageSender.start(procResult)) {
+ System.out.println("Start http sender failure: process result=" +
procResult);
+ }
+
+ System.out.println("InLongHttpMsgSender start, nodes="
+ + messageSender.getProxyNodeInfos());
+
+ ExampleUtils.sendHttpMessages(messageSender, true, false,
+ groupId, streamId, reqCnt, msgSize, msgCnt, procResult);
+ ExampleUtils.sendHttpMessages(messageSender, true, true,
+ groupId, streamId, reqCnt, msgSize, msgCnt, procResult);
+ ExampleUtils.sendHttpMessages(messageSender, false, false,
+ groupId, streamId, reqCnt, msgSize, msgCnt, procResult);
+ ExampleUtils.sendHttpMessages(messageSender, false, true,
+ groupId, streamId, reqCnt, msgSize, msgCnt, procResult);
+
+ ProxyUtils.sleepSomeTime(10000L);
+ }
+}
diff --git
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/http/InternalHttpSender.java
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/http/InternalHttpSender.java
index 51452d528a..382ba58ad8 100644
---
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/http/InternalHttpSender.java
+++
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/http/InternalHttpSender.java
@@ -52,8 +52,9 @@ import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
+@Deprecated
/**
- * internal http sender
+ * Replace by InLongHttpMsgSender
*/
public class InternalHttpSender {
diff --git
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/HttpProxySender.java
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/HttpProxySender.java
index 1b5653bc46..bd06d38b7f 100644
---
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/HttpProxySender.java
+++
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/HttpProxySender.java
@@ -37,8 +37,10 @@ import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
+@Deprecated
/**
* http sender
+ * Replace by InLongHttpMsgSender
*/
public class HttpProxySender extends Thread {
diff --git
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/http/HttpAsyncObj.java
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/http/HttpAsyncObj.java
new file mode 100644
index 0000000000..e58644fb29
--- /dev/null
+++
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/http/HttpAsyncObj.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sdk.dataproxy.network.http;
+
+import org.apache.inlong.sdk.dataproxy.sender.MsgSendCallback;
+import org.apache.inlong.sdk.dataproxy.sender.http.HttpEventInfo;
+
+/**
+ * HTTP Asynchronously Object class
+ *
+ * Used to carry the reported message content
+ */
+public class HttpAsyncObj {
+
+ private final HttpEventInfo httpEvent;
+ private final MsgSendCallback callback;
+ private final long rptMs;
+
+ public HttpAsyncObj(HttpEventInfo httpEvent, MsgSendCallback callback) {
+ this.httpEvent = httpEvent;
+ this.callback = callback;
+ this.rptMs = System.currentTimeMillis();
+ }
+
+ public HttpEventInfo getHttpEvent() {
+ return httpEvent;
+ }
+
+ public MsgSendCallback getCallback() {
+ return callback;
+ }
+
+ public long getRptMs() {
+ return rptMs;
+ }
+}
diff --git
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/http/HttpClientMgr.java
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/http/HttpClientMgr.java
new file mode 100644
index 0000000000..4e875359cf
--- /dev/null
+++
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/http/HttpClientMgr.java
@@ -0,0 +1,461 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sdk.dataproxy.network.http;
+
+import org.apache.inlong.common.enums.DataProxyErrCode;
+import org.apache.inlong.common.msg.AttributeConstants;
+import org.apache.inlong.sdk.dataproxy.common.ErrorCode;
+import org.apache.inlong.sdk.dataproxy.common.ProcessResult;
+import org.apache.inlong.sdk.dataproxy.common.SdkConsts;
+import org.apache.inlong.sdk.dataproxy.config.HostInfo;
+import org.apache.inlong.sdk.dataproxy.network.ClientMgr;
+import org.apache.inlong.sdk.dataproxy.sender.BaseSender;
+import org.apache.inlong.sdk.dataproxy.sender.http.HttpEventInfo;
+import org.apache.inlong.sdk.dataproxy.sender.http.HttpMsgSenderConfig;
+import org.apache.inlong.sdk.dataproxy.utils.HttpUtils;
+import org.apache.inlong.sdk.dataproxy.utils.LogCounter;
+import org.apache.inlong.sdk.dataproxy.utils.ProxyUtils;
+
+import com.google.gson.JsonElement;
+import com.google.gson.JsonObject;
+import com.google.gson.JsonParser;
+import io.netty.handler.codec.http.HttpHeaderValues;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.http.HttpHeaders;
+import org.apache.http.HttpStatus;
+import org.apache.http.client.methods.CloseableHttpResponse;
+import org.apache.http.client.methods.HttpPost;
+import org.apache.http.client.utils.URLEncodedUtils;
+import org.apache.http.entity.StringEntity;
+import org.apache.http.impl.client.CloseableHttpClient;
+import org.apache.http.message.BasicNameValuePair;
+import org.apache.http.util.EntityUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * HTTP Client Manager class
+ *
+ * Used to manage HTTP clients, including periodically selecting proxy nodes,
+ * finding available nodes when reporting messages, maintaining inflight
message
+ * sending status, finding responses to corresponding requests, etc.
+ */
+public class HttpClientMgr implements ClientMgr {
+
+ private static final Logger logger =
LoggerFactory.getLogger(HttpClientMgr.class);
+ private static final LogCounter updConExptCnt = new LogCounter(10, 100000,
60 * 1000L);
+ private static final LogCounter sendMsgExptCnt = new LogCounter(10,
100000, 60 * 1000L);
+ private static final LogCounter asyncSendExptCnt = new LogCounter(10,
100000, 60 * 1000L);
+
+ private final BaseSender sender;
+ private final HttpMsgSenderConfig httpConfig;
+ private CloseableHttpClient httpClient;
+ private final LinkedBlockingQueue<HttpAsyncObj> messageCache;
+ private final Semaphore asyncIdleCellCnt;
+ private final ExecutorService workerServices =
Executors.newCachedThreadPool();
+ private final AtomicBoolean shutDown = new AtomicBoolean(false);
+ // meta info
+ private ConcurrentHashMap<String, HostInfo> usingNodeMaps = new
ConcurrentHashMap<>();
+ private final ConcurrentHashMap<String, Long> connFailNodeMap = new
ConcurrentHashMap<>();
+ // current using nodes
+ private List<String> activeNodes = new ArrayList<>();
+ private volatile long lastUpdateTime = -1;
+ // node select index
+ private final AtomicInteger reqSendIndex = new AtomicInteger(0);
+
+ public HttpClientMgr(BaseSender sender, HttpMsgSenderConfig httpConfig) {
+ this.sender = sender;
+ this.httpConfig = httpConfig;
+ this.messageCache = new
LinkedBlockingQueue<>(httpConfig.getHttpAsyncRptCacheSize());
+ this.asyncIdleCellCnt = new
Semaphore(httpConfig.getHttpAsyncRptCacheSize(), true);
+ }
+
+ @Override
+ public boolean start(ProcessResult procResult) {
+ // build http client
+ if (!HttpUtils.constructHttpClient(httpConfig.isRptDataByHttps(),
+ httpConfig.getHttpSocketTimeoutMs(),
httpConfig.getHttpConTimeoutMs(),
+ httpConfig.getTlsVersion(), procResult)) {
+ return false;
+ }
+ this.httpClient = (CloseableHttpClient) procResult.getRetData();
+ // build async report workers
+ for (int i = 0; i < httpConfig.getHttpAsyncRptWorkerNum(); i++) {
+ workerServices.execute(new HttpAsyncReportWorker(i));
+ }
+ logger.info("ClientMgr({}) started!", this.sender.getSenderId());
+ return procResult.setSuccess();
+ }
+
+ /**
+ * close resources
+ */
+ @Override
+ public void stop() {
+ if (!this.shutDown.compareAndSet(false, true)) {
+ return;
+ }
+ int remainCnt = 0;
+ if (!messageCache.isEmpty()) {
+ long startTime = System.currentTimeMillis();
+ while (!messageCache.isEmpty()) {
+ if (System.currentTimeMillis() - startTime >=
httpConfig.getHttpCloseWaitPeriodMs()) {
+ break;
+ }
+ ProxyUtils.sleepSomeTime(100L);
+ }
+ remainCnt = messageCache.size();
+ messageCache.clear();
+ }
+ workerServices.shutdown();
+ if (httpClient != null) {
+ try {
+ httpClient.close();
+ } catch (Throwable ignore) {
+ //
+ }
+ }
+ logger.info("ClientMgr({}) stopped, remain ({}) messages discarded!",
+ this.sender.getSenderId(), remainCnt);
+ }
+
+ @Override
+ public int getInflightMsgCnt() {
+ return this.messageCache.size();
+ }
+
+ @Override
+ public int getActiveNodeCnt() {
+ return activeNodes.size();
+ }
+
+ @Override
+ public void updateProxyInfoList(boolean nodeChanged,
ConcurrentHashMap<String, HostInfo> hostInfoMap) {
+ if (hostInfoMap.isEmpty() || this.shutDown.get()) {
+ return;
+ }
+ long curTime = System.currentTimeMillis();
+ try {
+ // shuffle candidate nodes
+ List<HostInfo> candidateNodes = new
ArrayList<>(hostInfoMap.size());
+ candidateNodes.addAll(hostInfoMap.values());
+ Collections.sort(candidateNodes);
+ Collections.shuffle(candidateNodes);
+ int curTotalCnt = candidateNodes.size();
+ int needActiveCnt = Math.min(httpConfig.getAliveConnections(),
curTotalCnt);
+ // build next step nodes
+ Long lstFailTime;
+ int maxCycleCnt = 3;
+ this.connFailNodeMap.clear();
+ List<String> realHosts = new ArrayList<>();
+ ConcurrentHashMap<String, HostInfo> tmpNodeMaps = new
ConcurrentHashMap<>();
+ do {
+ int selectCnt = 0;
+ long selectTime = System.currentTimeMillis();
+ for (HostInfo hostInfo : candidateNodes) {
+ if (realHosts.contains(hostInfo.getReferenceName())) {
+ continue;
+ }
+ lstFailTime =
this.connFailNodeMap.get(hostInfo.getReferenceName());
+ if (lstFailTime != null
+ && selectTime - lstFailTime <=
httpConfig.getHttpNodeReuseWaitIfFailMs()) {
+ continue;
+ }
+ tmpNodeMaps.put(hostInfo.getReferenceName(), hostInfo);
+ realHosts.add(hostInfo.getReferenceName());
+ if (lstFailTime != null) {
+
this.connFailNodeMap.remove(hostInfo.getReferenceName());
+ }
+ if (++selectCnt >= needActiveCnt) {
+ break;
+ }
+ }
+ if (!realHosts.isEmpty()) {
+ break;
+ }
+ ProxyUtils.sleepSomeTime(1000L);
+ } while (--maxCycleCnt > 0);
+ // update active nodes
+ if (realHosts.isEmpty()) {
+ if (nodeChanged) {
+ logger.error("ClientMgr({}) changed nodes, but all nodes
failure, nodes={}, failNodes={}!",
+ this.sender.getSenderId(), candidateNodes,
connFailNodeMap);
+ } else {
+ logger.error("ClientMgr({}) re-choose nodes, but all nodes
failure, nodes={}, failNodes={}!",
+ this.sender.getSenderId(), candidateNodes,
connFailNodeMap);
+ }
+ } else {
+ this.lastUpdateTime = System.currentTimeMillis();
+ this.usingNodeMaps = tmpNodeMaps;
+ this.activeNodes = realHosts;
+ if (nodeChanged) {
+ logger.info("ClientMgr({}) changed nodes, wast {}ms,
nodeCnt=(r:{}-a:{}), actives={}, fail={}",
+ this.sender.getSenderId(),
(System.currentTimeMillis() - curTime),
+ needActiveCnt, realHosts.size(), realHosts,
connFailNodeMap.keySet());
+ } else {
+ logger.info("ClientMgr({}) re-choose nodes, wast {}ms,
nodeCnt=(r:{}-a:{}), actives={}, fail={}",
+ this.sender.getSenderId(),
(System.currentTimeMillis() - curTime),
+ needActiveCnt, realHosts.size(), realHosts,
connFailNodeMap.keySet());
+ }
+ }
+ } catch (Throwable ex) {
+ if (updConExptCnt.shouldPrint()) {
+ logger.warn("ClientMgr({}) update nodes throw exception",
+ this.sender.getSenderId(), ex);
+ }
+ }
+ }
+
+ public boolean asyncSendMessage(HttpAsyncObj asyncObj, ProcessResult
procResult) {
+ if (this.shutDown.get()) {
+ return procResult.setFailResult(ErrorCode.SDK_CLOSED);
+ }
+ List<String> curNodes = this.activeNodes;
+ if (curNodes.isEmpty()) {
+ return procResult.setFailResult(ErrorCode.EMPTY_ACTIVE_NODE_SET);
+ }
+ if (!this.asyncIdleCellCnt.tryAcquire()) {
+ return procResult.setFailResult(ErrorCode.HTTP_ASYNC_POOL_FULL);
+ }
+ boolean released = false;
+ try {
+ if (!this.messageCache.offer(asyncObj)) {
+ this.asyncIdleCellCnt.release();
+ released = true;
+ return
procResult.setFailResult(ErrorCode.HTTP_ASYNC_OFFER_FAIL);
+ }
+ return procResult.setSuccess();
+ } catch (Throwable ex) {
+ if (!released) {
+ this.asyncIdleCellCnt.release();
+ }
+ if (asyncSendExptCnt.shouldPrint()) {
+ logger.warn("ClientMgr({}) async offer event exception",
this.sender.getSenderId(), ex);
+ }
+ return
procResult.setFailResult(ErrorCode.HTTP_ASYNC_OFFER_EXCEPTION, ex.getMessage());
+ }
+ }
+
+ /**
+ * send message to remote nodes
+ */
+ public boolean sendMessage(HttpEventInfo httpEvent, ProcessResult
procResult) {
+ if (this.shutDown.get()) {
+ return procResult.setFailResult(ErrorCode.SDK_CLOSED);
+ }
+ List<String> curNodes = this.activeNodes;
+ int curNodeSize = curNodes.size();
+ if (curNodeSize == 0) {
+ return procResult.setFailResult(ErrorCode.EMPTY_ACTIVE_NODE_SET);
+ }
+ String curNode;
+ HostInfo hostInfo;
+ Long lstFailTime;
+ int nullNodeCnt = 0;
+ HostInfo back1thNode = null;
+ long nodeSelectTime = System.currentTimeMillis();
+ int startPos = reqSendIndex.getAndIncrement();
+ for (int index = 0; index < curNodeSize; index++) {
+ curNode = curNodes.get(Math.abs(startPos++) % curNodeSize);
+ hostInfo = usingNodeMaps.get(curNode);
+ if (hostInfo == null) {
+ nullNodeCnt++;
+ continue;
+ }
+ lstFailTime = connFailNodeMap.get(hostInfo.getReferenceName());
+ if (lstFailTime != null) {
+ if (nodeSelectTime - lstFailTime <=
httpConfig.getHttpNodeReuseWaitIfFailMs()) {
+ back1thNode = hostInfo;
+ continue;
+ }
+ connFailNodeMap.remove(hostInfo.getReferenceName(),
lstFailTime);
+ }
+ return innSendMsgByHttp(httpEvent, hostInfo, procResult);
+ }
+ if (nullNodeCnt == curNodeSize) {
+ return procResult.setFailResult(ErrorCode.EMPTY_ACTIVE_NODE_SET);
+ }
+ if (back1thNode != null) {
+ return innSendMsgByHttp(httpEvent, back1thNode, procResult);
+ }
+ return procResult.setFailResult(ErrorCode.NO_VALID_REMOTE_NODE);
+ }
+
+ /**
+ * send request to DataProxy over http
+ */
+ private boolean innSendMsgByHttp(HttpEventInfo httpEvent, HostInfo
hostInfo, ProcessResult procResult) {
+ String rmtRptUrl = (httpConfig.isRptDataByHttps() ?
SdkConsts.PREFIX_HTTPS : SdkConsts.PREFIX_HTTP)
+ + hostInfo.getReferenceName()
+ + SdkConsts.DATAPROXY_REPORT_METHOD;
+ if (!buildFormUrlPost(rmtRptUrl, httpEvent, procResult)) {
+ return false;
+ }
+ HttpPost httpPost = (HttpPost) procResult.getRetData();
+ CloseableHttpResponse response = null;
+ try {
+ response = httpClient.execute(httpPost);
+ String returnStr = EntityUtils.toString(response.getEntity());
+ int returnCode = response.getStatusLine().getStatusCode();
+ if (HttpStatus.SC_OK != returnCode) {
+ if (sendMsgExptCnt.shouldPrint()) {
+ logger.warn("ClientMgr({}) report event failure,
errCode={}, returnStr={}",
+ this.sender.getSenderId(), returnCode, returnStr);
+ }
+ if (response.getStatusLine().getStatusCode() >= 500) {
+ this.connFailNodeMap.put(hostInfo.getReferenceName(),
System.currentTimeMillis());
+ }
+ return procResult.setFailResult(ErrorCode.RMT_RETURN_FAILURE,
+ response.getStatusLine().getStatusCode() + ":" +
returnStr);
+ }
+ if (StringUtils.isBlank(returnStr)) {
+ return
procResult.setFailResult(ErrorCode.RMT_RETURN_BLANK_CONTENT);
+ }
+ if (logger.isDebugEnabled()) {
+ logger.debug("success to report event, url={}, result={}",
+ rmtRptUrl, returnStr);
+ }
+ JsonObject jsonResponse =
JsonParser.parseString(returnStr).getAsJsonObject();
+ JsonElement codeElement = jsonResponse.get("code");
+ JsonElement msgElement = jsonResponse.get("msg");
+ if (codeElement != null) {
+ int errCode = codeElement.getAsInt();
+ if (errCode == DataProxyErrCode.SUCCESS.getErrCode()) {
+ return procResult.setSuccess();
+ } else {
+ return
procResult.setFailResult(ErrorCode.DP_RETURN_FAILURE,
+ errCode + ":" + (msgElement != null ?
msgElement.getAsString() : ""));
+ }
+ }
+ return procResult.setFailResult(ErrorCode.DP_RETURN_UNKNOWN_ERROR,
returnStr);
+ } catch (Throwable ex) {
+ if (sendMsgExptCnt.shouldPrint()) {
+ logger.warn("ClientMgr({}) report event exception, url={}",
+ this.sender.getSenderId(), rmtRptUrl, ex);
+ }
+ return procResult.setFailResult(ErrorCode.HTTP_VISIT_DP_EXCEPTION,
ex.getMessage());
+ } finally {
+ if (httpPost != null) {
+ httpPost.releaseConnection();
+ }
+ if (response != null) {
+ try {
+ response.close();
+ } catch (Throwable ex) {
+ if (sendMsgExptCnt.shouldPrint()) {
+ logger.warn("ClientMgr({}) close response exception,
url={}",
+ this.sender.getSenderId(), rmtRptUrl, ex);
+ }
+ }
+ }
+ }
+ }
+
+ private boolean buildFormUrlPost(
+ String rmtRptUrl, HttpEventInfo httpEvent, ProcessResult
procResult) {
+ ArrayList<BasicNameValuePair> contents = new ArrayList<>();
+ try {
+ HttpPost httpPost = new HttpPost(rmtRptUrl);
+ httpPost.setHeader(HttpHeaders.CONNECTION,
+ HttpHeaderValues.CLOSE.toString());
+ httpPost.setHeader(HttpHeaders.CONTENT_TYPE,
+
HttpHeaderValues.APPLICATION_X_WWW_FORM_URLENCODED.toString());
+ contents.add(new BasicNameValuePair(AttributeConstants.GROUP_ID,
+ httpEvent.getGroupId()));
+ contents.add(new BasicNameValuePair(AttributeConstants.STREAM_ID,
+ httpEvent.getStreamId()));
+ contents.add(new BasicNameValuePair(AttributeConstants.DATA_TIME,
+ String.valueOf(httpEvent.getDtMs())));
+ contents.add(new BasicNameValuePair(SdkConsts.KEY_HTTP_FIELD_BODY,
+ StringUtils.join(httpEvent.getBodyList(),
httpConfig.getHttpEventsSeparator())));
+ contents.add(new
BasicNameValuePair(AttributeConstants.MESSAGE_COUNT,
+ String.valueOf(httpEvent.getMsgCnt())));
+ if (!httpConfig.isSepEventByLF()) {
+ contents.add(new
BasicNameValuePair(SdkConsts.KEY_HTTP_FIELD_DELIMITER,
+ httpConfig.getHttpEventsSeparator()));
+ }
+ String encodedContents = URLEncodedUtils.format(contents,
StandardCharsets.UTF_8);
+ httpPost.setEntity(new StringEntity(encodedContents));
+ if (logger.isDebugEnabled()) {
+ logger.debug("begin to post request to {}, encoded content is:
{}",
+ rmtRptUrl, encodedContents);
+ }
+ return procResult.setSuccess(httpPost);
+ } catch (Throwable ex) {
+ if (sendMsgExptCnt.shouldPrint()) {
+ logger.warn("ClientMgr({}) build form-url content failure,
content={}",
+ this.sender.getSenderId(), contents, ex);
+ }
+ return
procResult.setFailResult(ErrorCode.BUILD_FORM_CONTENT_EXCEPTION,
ex.getMessage());
+ }
+ }
+
+ /**
+ * check cache runner
+ */
+ private class HttpAsyncReportWorker implements Runnable {
+
+ private final String workerId;
+
+ public HttpAsyncReportWorker(int workerId) {
+ this.workerId = sender.getSenderId() + "-" + workerId;
+ }
+
+ @Override
+ public void run() {
+ long curTime = 0;
+ HttpAsyncObj asyncObj;
+ ProcessResult procResult = new ProcessResult();
+ logger.info("HttpAsyncReportWorker({}) started", this.workerId);
+ // if not shutdown or queue is not empty
+ while (!shutDown.get() || !messageCache.isEmpty()) {
+ while (!messageCache.isEmpty()) {
+ asyncObj = messageCache.poll();
+ if (asyncObj == null) {
+ continue;
+ }
+ try {
+ sendMessage(asyncObj.getHttpEvent(), procResult);
+ curTime = System.currentTimeMillis();
+ asyncObj.getCallback().onMessageAck(procResult);
+ } catch (Throwable ex) {
+ if (asyncSendExptCnt.shouldPrint()) {
+ logger.error("HttpAsync({}) report event
exception", workerId, ex);
+ }
+ } finally {
+ asyncIdleCellCnt.release();
+ }
+ }
+
ProxyUtils.sleepSomeTime(httpConfig.getHttpAsyncWorkerIdleWaitMs());
+ }
+ logger.info("HttpAsyncReportWorker({}) stopped", this.workerId);
+ }
+ }
+}
\ No newline at end of file
diff --git
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/common/HttpContentType.java
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/http/HttpContentType.java
similarity index 94%
rename from
inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/common/HttpContentType.java
rename to
inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/http/HttpContentType.java
index 0db9261c70..9020b6b7d6 100644
---
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/common/HttpContentType.java
+++
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/http/HttpContentType.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.inlong.sdk.dataproxy.common;
+package org.apache.inlong.sdk.dataproxy.network.http;
/**
* HTTP Report Content Type enum
diff --git
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/sender/BaseSender.java
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/sender/BaseSender.java
index c103bd31f4..3b6485eac3 100644
---
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/sender/BaseSender.java
+++
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/sender/BaseSender.java
@@ -83,6 +83,8 @@ public abstract class BaseSender implements ConfigHolder {
this.senderFactory = senderFactory;
this.factoryClusterIdKey = clusterIdKey;
this.senderId = configure.getDataRptProtocol() + "-" +
senderIdGen.incrementAndGet();
+ this.configManager = new ProxyConfigManager(this.senderId,
this.baseConfig, this);
+ this.configManager.setDaemon(true);
}
public boolean start(ProcessResult procResult) {
diff --git
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/sender/http/HttpMsgSenderConfig.java
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/sender/http/HttpMsgSenderConfig.java
index 6c7e5ac8b0..d05f4c821e 100644
---
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/sender/http/HttpMsgSenderConfig.java
+++
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/sender/http/HttpMsgSenderConfig.java
@@ -18,11 +18,11 @@
package org.apache.inlong.sdk.dataproxy.sender.http;
import org.apache.inlong.common.msg.AttributeConstants;
-import org.apache.inlong.sdk.dataproxy.common.HttpContentType;
import org.apache.inlong.sdk.dataproxy.common.ProxyClientConfig;
import org.apache.inlong.sdk.dataproxy.common.ReportProtocol;
import org.apache.inlong.sdk.dataproxy.common.SdkConsts;
import org.apache.inlong.sdk.dataproxy.exception.ProxySdkException;
+import org.apache.inlong.sdk.dataproxy.network.http.HttpContentType;
import org.apache.commons.lang3.StringUtils;
diff --git
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/sender/http/InLongHttpMsgSender.java
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/sender/http/InLongHttpMsgSender.java
new file mode 100644
index 0000000000..e7d0b44a34
--- /dev/null
+++
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/sender/http/InLongHttpMsgSender.java
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sdk.dataproxy.sender.http;
+
+import org.apache.inlong.sdk.dataproxy.MsgSenderFactory;
+import org.apache.inlong.sdk.dataproxy.common.ErrorCode;
+import org.apache.inlong.sdk.dataproxy.common.ProcessResult;
+import org.apache.inlong.sdk.dataproxy.common.SdkConsts;
+import org.apache.inlong.sdk.dataproxy.network.http.HttpAsyncObj;
+import org.apache.inlong.sdk.dataproxy.network.http.HttpClientMgr;
+import org.apache.inlong.sdk.dataproxy.sender.BaseSender;
+import org.apache.inlong.sdk.dataproxy.sender.MsgSendCallback;
+import org.apache.inlong.sdk.dataproxy.utils.LogCounter;
+
+/**
+ * HTTP(s) Message Sender class
+ *
+ * Used to define the HTTP(s) sender common methods
+ */
+public class InLongHttpMsgSender extends BaseSender implements HttpMsgSender {
+
+ protected static final LogCounter httpExceptCnt = new LogCounter(10,
100000, 60 * 1000L);
+ private final HttpClientMgr httpClientMgr;
+ private final HttpMsgSenderConfig httpConfig;
+
+ public InLongHttpMsgSender(HttpMsgSenderConfig configure) {
+ this(configure, null, null);
+ }
+
+ public InLongHttpMsgSender(HttpMsgSenderConfig configure, MsgSenderFactory
senderFactory, String clusterIdKey) {
+ super(configure, senderFactory, clusterIdKey);
+ this.httpConfig = (HttpMsgSenderConfig) baseConfig;
+ this.clientMgr = new HttpClientMgr(this, this.httpConfig);
+ this.httpClientMgr = (HttpClientMgr) clientMgr;
+ }
+
+ @Override
+ public boolean syncSendMessage(HttpEventInfo eventInfo, ProcessResult
procResult) {
+ validParamsNotNull(eventInfo, procResult);
+ if (!this.isStarted()) {
+ return procResult.setFailResult(ErrorCode.SDK_CLOSED);
+ }
+ if (this.isMetaInfoUnReady()) {
+ return procResult.setFailResult(ErrorCode.NO_NODE_META_INFOS);
+ }
+ // check package length
+ if (!isValidPkgLength(eventInfo, this.getAllowedPkgLength(),
procResult)) {
+ return false;
+ }
+ return httpClientMgr.sendMessage(eventInfo, procResult);
+ }
+
+ @Override
+ public boolean asyncSendMessage(HttpEventInfo eventInfo, MsgSendCallback
callback, ProcessResult procResult) {
+ validParamsNotNull(eventInfo, callback, procResult);
+ if (!this.isStarted()) {
+ return procResult.setFailResult(ErrorCode.SDK_CLOSED);
+ }
+ if (this.isMetaInfoUnReady()) {
+ return procResult.setFailResult(ErrorCode.NO_NODE_META_INFOS);
+ }
+ // check package length
+ if (!isValidPkgLength(eventInfo, this.getAllowedPkgLength(),
procResult)) {
+ return false;
+ }
+ return httpClientMgr.asyncSendMessage(new HttpAsyncObj(eventInfo,
callback), procResult);
+ }
+
+ @Override
+ public int getActiveNodeCnt() {
+ return httpClientMgr.getActiveNodeCnt();
+ }
+
+ @Override
+ public int getInflightMsgCnt() {
+ return httpClientMgr.getInflightMsgCnt();
+ }
+
+ private boolean isValidPkgLength(HttpEventInfo eventInfo, int allowedLen,
ProcessResult procResult) {
+ // Not valid if the maximum limit is less than or equal to 0
+ if (allowedLen < 0) {
+ return true;
+ }
+ int eventLen = eventInfo.getBodySize()
+ + eventInfo.getGroupId().length()
+ + eventInfo.getStreamId().length()
+ + String.valueOf(eventInfo.getDtMs()).length();
+ // Reserve space for attribute
+ if (eventLen > allowedLen - SdkConsts.RESERVED_ATTRIBUTE_LENGTH) {
+ String errMsg = String.format("OverMaxLen: content length(%d) >
allowedLen(%d) - fixedLen(%d)",
+ eventLen, allowedLen, SdkConsts.RESERVED_ATTRIBUTE_LENGTH);
+ if (httpExceptCnt.shouldPrint()) {
+ logger.warn(errMsg);
+ }
+ return
procResult.setFailResult(ErrorCode.REPORT_INFO_EXCEED_MAX_LEN, errMsg);
+ }
+ return true;
+ }
+
+ private void validParamsNotNull(HttpEventInfo eventInfo, ProcessResult
procResult) {
+ if (eventInfo == null) {
+ throw new NullPointerException("eventInfo is null");
+ }
+ if (procResult == null) {
+ throw new NullPointerException("procResult is null");
+ }
+ }
+
+ private void validParamsNotNull(HttpEventInfo eventInfo, MsgSendCallback
callback, ProcessResult procResult) {
+ if (eventInfo == null) {
+ throw new NullPointerException("eventInfo is null");
+ }
+ if (callback == null) {
+ throw new NullPointerException("callback is null");
+ }
+ if (procResult == null) {
+ throw new NullPointerException("procResult is null");
+ }
+ }
+
+}
diff --git
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/utils/HttpUtils.java
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/utils/HttpUtils.java
new file mode 100644
index 0000000000..228ee59a24
--- /dev/null
+++
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/utils/HttpUtils.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sdk.dataproxy.utils;
+
+import org.apache.inlong.sdk.dataproxy.common.ErrorCode;
+import org.apache.inlong.sdk.dataproxy.common.ProcessResult;
+
+import org.apache.http.client.config.RequestConfig;
+import org.apache.http.conn.ssl.SSLConnectionSocketFactory;
+import org.apache.http.impl.client.CloseableHttpClient;
+import org.apache.http.impl.client.HttpClientBuilder;
+import org.apache.http.impl.client.HttpClients;
+import org.apache.http.ssl.SSLContexts;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.net.ssl.SSLContext;
+
+/**
+ * Http(s) Utils class
+ *
+ * Used to place public processing functions related to HTTP(s)
+ */
+public class HttpUtils {
+
+ private static final Logger logger =
LoggerFactory.getLogger(HttpUtils.class);
+ private static final LogCounter exceptCnt = new LogCounter(10, 200000, 60
* 1000L);
+
+ public static boolean constructHttpClient(boolean rptByHttps,
+ int socketTimeoutMs, int conTimeoutMs, String tlsVer,
ProcessResult procResult) {
+ CloseableHttpClient httpClient;
+ RequestConfig requestConfig = RequestConfig.custom()
+ .setSocketTimeout(socketTimeoutMs)
+ .setConnectTimeout(conTimeoutMs).build();
+ try {
+ if (rptByHttps) {
+ SSLContext sslContext = SSLContexts.custom().build();
+ SSLConnectionSocketFactory sslSf = new
SSLConnectionSocketFactory(sslContext,
+ new String[]{tlsVer}, null,
+
SSLConnectionSocketFactory.getDefaultHostnameVerifier());
+ httpClient = HttpClients.custom()
+ .setDefaultRequestConfig(requestConfig)
+ .setSSLSocketFactory(sslSf).build();
+ } else {
+ httpClient = HttpClientBuilder.create()
+ .setDefaultRequestConfig(requestConfig).build();
+ }
+ return procResult.setSuccess(httpClient);
+ } catch (Throwable ex) {
+ if (exceptCnt.shouldPrint()) {
+ logger.error("Build http client exception", ex);
+ }
+ return
procResult.setFailResult(ErrorCode.HTTP_BUILD_CLIENT_EXCEPTION,
ex.getMessage());
+ }
+ }
+}