This is an automated email from the ASF dual-hosted git repository.
gosonzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 6915519ac2 [INLONG-11475][SDK] Remove the timeout parameter in the
MessageSender class functions (#11476)
6915519ac2 is described below
commit 6915519ac2fa755bb35408399dcdc992c3c6af85
Author: Goson Zhang <[email protected]>
AuthorDate: Mon Nov 11 08:58:52 2024 +0800
[INLONG-11475][SDK] Remove the timeout parameter in the MessageSender class
functions (#11476)
Co-authored-by: gosonzhang <[email protected]>
---
.../inlong/agent/core/AgentStatusManager.java | 3 +-
.../inlong/agent/core/FileStaticManager.java | 3 +-
.../apache/inlong/agent/core/HeartbeatManager.java | 1 +
.../inlong/sdk/dataproxy/ConfigConstants.java | 15 ++--
.../inlong/sdk/dataproxy/DefaultMessageSender.java | 96 ++++++++++++----------
.../apache/inlong/sdk/dataproxy/MessageSender.java | 23 ++----
.../inlong/sdk/dataproxy/ProxyClientConfig.java | 52 ++++++++----
.../sdk/dataproxy/example/SendMsgThread.java | 4 +-
.../sdk/dataproxy/example/TcpClientExample.java | 4 +-
.../inlong/sdk/dataproxy/network/NettyClient.java | 5 +-
.../inlong/sdk/dataproxy/network/Sender.java | 16 ++--
.../sdk/dataproxy/pb/PbProtocolMessageSender.java | 74 +++++++----------
12 files changed, 148 insertions(+), 148 deletions(-)
diff --git
a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/AgentStatusManager.java
b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/AgentStatusManager.java
index 307c79f1da..7292c1a577 100644
---
a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/AgentStatusManager.java
+++
b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/AgentStatusManager.java
@@ -42,7 +42,6 @@ import java.math.RoundingMode;
import java.nio.charset.StandardCharsets;
import java.text.SimpleDateFormat;
import java.util.List;
-import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import static
org.apache.inlong.agent.constant.AgentConstants.AGENT_CLUSTER_NAME;
@@ -168,7 +167,7 @@ public class AgentStatusManager {
INLONG_AGENT_SYSTEM,
INLONG_AGENT_STATUS,
AgentUtils.getCurrentTime(),
- "", 30, TimeUnit.SECONDS);
+ "");
if (ret != SendResult.OK) {
LOGGER.error("send status failed: ret {}", ret);
}
diff --git
a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/FileStaticManager.java
b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/FileStaticManager.java
index abda6a2eab..774dee4247 100644
---
a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/FileStaticManager.java
+++
b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/FileStaticManager.java
@@ -34,7 +34,6 @@ import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.TimeUnit;
import static
org.apache.inlong.agent.constant.AgentConstants.AGENT_CLUSTER_NAME;
import static
org.apache.inlong.agent.constant.AgentConstants.AGENT_CLUSTER_TAG;
@@ -135,7 +134,7 @@ public class FileStaticManager {
INLONG_AGENT_SYSTEM,
INLONG_FILE_STATIC,
AgentUtils.getCurrentTime(),
- "", 30, TimeUnit.SECONDS);
+ "");
if (ret != SendResult.OK) {
LOGGER.error("send static failed: ret {}", ret);
}
diff --git
a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/HeartbeatManager.java
b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/HeartbeatManager.java
index d436a32636..ead9439114 100644
---
a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/HeartbeatManager.java
+++
b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/HeartbeatManager.java
@@ -201,6 +201,7 @@ public class HeartbeatManager extends AbstractDaemon
implements AbstractHeartbea
proxyClientConfig.setAliveConnections(CommonConstants.DEFAULT_PROXY_ALIVE_CONNECTION_NUM);
proxyClientConfig.setIoThreadNum(CommonConstants.DEFAULT_PROXY_CLIENT_IO_THREAD_NUM);
proxyClientConfig.setProtocolType(ProtocolType.TCP);
+ proxyClientConfig.setRequestTimeoutMs(30000L);
ThreadFactory SHARED_FACTORY = new
DefaultThreadFactory("agent-sender-manager-heartbeat",
Thread.currentThread().isDaemon());
sender = new DefaultMessageSender(proxyClientConfig,
SHARED_FACTORY);
diff --git
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/ConfigConstants.java
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/ConfigConstants.java
index 40d71d2859..26f8d131b4 100644
---
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/ConfigConstants.java
+++
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/ConfigConstants.java
@@ -17,8 +17,6 @@
package org.apache.inlong.sdk.dataproxy;
-import java.util.concurrent.TimeUnit;
-
public class ConfigConstants {
public static final String PROXY_SDK_VERSION = "1.2.11";
@@ -49,12 +47,13 @@ public class ConfigConstants {
public static final int MAX_LINE_CNT = 30;
- /* Default connection,connect timeout in milliseconds. */
- public static final long DEFAULT_CONNECT_TIMEOUT_MILLIS =
- TimeUnit.MILLISECONDS.convert(40, TimeUnit.SECONDS);
-
- public static final long DEFAULT_REQUEST_TIMEOUT_MILLIS =
- TimeUnit.MILLISECONDS.convert(40, TimeUnit.SECONDS);
+ // connection timeout in milliseconds
+ public static final long VAL_DEF_CONNECT_TIMEOUT_MS = 20000L;
+ public static final long VAL_MIN_CONNECT_TIMEOUT_MS = 1L;
+ public static final long VAL_DEF_CONNECT_CLOSE_DELAY_MS = 500L;
+ // request timeout in milliseconds
+ public static final long VAL_DEF_REQUEST_TIMEOUT_MS = 10000L;
+ public static final long VAL_MIN_REQUEST_TIMEOUT_MS = 1L;
public static final int DEFAULT_SEND_BUFFER_SIZE = 16777216;
public static final int DEFAULT_RECEIVE_BUFFER_SIZE = 16777216;
diff --git
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/DefaultMessageSender.java
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/DefaultMessageSender.java
index 9e2c8c06b5..153c43b8db 100644
---
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/DefaultMessageSender.java
+++
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/DefaultMessageSender.java
@@ -230,9 +230,8 @@ public class DefaultMessageSender implements MessageSender {
return sendIndexResult;
}
- public SendResult sendMessage(byte[] body, String groupId, String
streamId, long dt, String msgUUID,
- long timeout, TimeUnit timeUnit) {
- return sendMessage(body, groupId, streamId, dt, msgUUID, timeout,
timeUnit, false);
+ public SendResult sendMessage(byte[] body, String groupId, String
streamId, long dt, String msgUUID) {
+ return sendMessage(body, groupId, streamId, dt, msgUUID, false);
}
/**
@@ -243,13 +242,11 @@ public class DefaultMessageSender implements
MessageSender {
* @param streamId streamId
* @param dt data report timestamp
* @param msgUUID msg uuid
- * @param timeout
- * @param timeUnit
* @param isProxySend true: dataproxy doesn't return response message
until data is sent to MQ
* @return SendResult.OK means success
*/
- public SendResult sendMessage(byte[] body, String groupId, String
streamId, long dt, String msgUUID,
- long timeout, TimeUnit timeUnit, boolean isProxySend) {
+ public SendResult sendMessage(byte[] body, String groupId,
+ String streamId, long dt, String msgUUID, boolean isProxySend) {
dt = ProxyUtils.covertZeroDt(dt);
if (!ProxyUtils.isBodyValid(body) || !ProxyUtils.isDtValid(dt)) {
return SendResult.INVALID_ATTRIBUTES;
@@ -271,8 +268,7 @@ public class DefaultMessageSender implements MessageSender {
new EncodeObject(Collections.singletonList(body), msgtype,
isCompressEnd, isReport,
isGroupIdTransfer, dt / 1000,
idGenerator.getNextInt(), groupId, streamId, proxySend);
encodeObject.setSupportLF(isSupportLF);
- Function<Sender, SendResult> sendOperation = (sender) ->
sender.syncSendMessage(encodeObject, msgUUID,
- timeout, timeUnit);
+ Function<Sender, SendResult> sendOperation = (sender) ->
sender.syncSendMessage(encodeObject, msgUUID);
return attemptSendMessage(sendOperation);
} else if (msgtype == 3 || msgtype == 5) {
if (isProxySend) {
@@ -286,13 +282,13 @@ public class DefaultMessageSender implements
MessageSender {
"groupId=" + groupId + "&streamId=" + streamId +
"&dt=" + finalDt + "&cp=snappy"
+ finalProxySend,
idGenerator.getNextId(), this.getMsgtype(),
- true, groupId), msgUUID, timeout, timeUnit);
+ true, groupId), msgUUID);
} else {
sendOperation = (sender) -> sender.syncSendMessage(new
EncodeObject(Collections.singletonList(body),
"groupId=" + groupId + "&streamId=" + streamId +
"&dt=" + finalDt
+ finalProxySend,
idGenerator.getNextId(), this.getMsgtype(),
- false, groupId), msgUUID, timeout, timeUnit);
+ false, groupId), msgUUID);
}
return attemptSendMessage(sendOperation);
@@ -302,8 +298,8 @@ public class DefaultMessageSender implements MessageSender {
}
public SendResult sendMessage(byte[] body, String groupId, String
streamId, long dt, String msgUUID,
- long timeout, TimeUnit timeUnit, Map<String, String> extraAttrMap)
{
- return sendMessage(body, groupId, streamId, dt, msgUUID, timeout,
timeUnit, extraAttrMap, false);
+ Map<String, String> extraAttrMap) {
+ return sendMessage(body, groupId, streamId, dt, msgUUID, extraAttrMap,
false);
}
/**
@@ -314,14 +310,12 @@ public class DefaultMessageSender implements
MessageSender {
* @param streamId streamId
* @param dt data report timestamp
* @param msgUUID msg uuid
- * @param timeout
- * @param timeUnit
* @param extraAttrMap extra attributes
* @param isProxySend true: dataproxy doesn't return response message
until data is sent to MQ
* @return SendResult.OK means success
*/
public SendResult sendMessage(byte[] body, String groupId, String
streamId, long dt, String msgUUID,
- long timeout, TimeUnit timeUnit, Map<String, String> extraAttrMap,
boolean isProxySend) {
+ Map<String, String> extraAttrMap, boolean isProxySend) {
dt = ProxyUtils.covertZeroDt(dt);
if (!ProxyUtils.isBodyValid(body) || !ProxyUtils.isDtValid(dt) ||
!ProxyUtils.isAttrKeysValid(extraAttrMap)) {
@@ -345,8 +339,7 @@ public class DefaultMessageSender implements MessageSender {
isGroupIdTransfer, dt / 1000,
idGenerator.getNextInt(), groupId, streamId,
attrs.toString());
encodeObject.setSupportLF(isSupportLF);
- Function<Sender, SendResult> sendOperation = (sender) ->
sender.syncSendMessage(encodeObject, msgUUID,
- timeout, timeUnit);
+ Function<Sender, SendResult> sendOperation = (sender) ->
sender.syncSendMessage(encodeObject, msgUUID);
return attemptSendMessage(sendOperation);
} else if (msgtype == 3 || msgtype == 5) {
attrs.append("&groupId=").append(groupId).append("&streamId=").append(streamId).append("&dt=").append(dt);
@@ -355,13 +348,14 @@ public class DefaultMessageSender implements
MessageSender {
Function<Sender, SendResult> sendOperation = (sender) ->
sender.syncSendMessage(
new EncodeObject(Collections.singletonList(body),
attrs.toString(), idGenerator.getNextId(),
this.getMsgtype(), true, groupId),
- msgUUID, timeout, timeUnit);
+ msgUUID);
return attemptSendMessage(sendOperation);
} else {
Function<Sender, SendResult> sendOperation = (sender) ->
sender.syncSendMessage(
new EncodeObject(Collections.singletonList(body),
- attrs.toString(), idGenerator.getNextId(),
this.getMsgtype(), false, groupId),
- msgUUID, timeout, timeUnit);
+ attrs.toString(), idGenerator.getNextId(),
+ this.getMsgtype(), false, groupId),
+ msgUUID);
return attemptSendMessage(sendOperation);
}
}
@@ -369,9 +363,8 @@ public class DefaultMessageSender implements MessageSender {
}
- public SendResult sendMessage(List<byte[]> bodyList, String groupId,
String streamId, long dt, String msgUUID,
- long timeout, TimeUnit timeUnit) {
- return sendMessage(bodyList, groupId, streamId, dt, msgUUID, timeout,
timeUnit, false);
+ public SendResult sendMessage(List<byte[]> bodyList, String groupId,
String streamId, long dt, String msgUUID) {
+ return sendMessage(bodyList, groupId, streamId, dt, msgUUID, false);
}
/**
@@ -382,13 +375,11 @@ public class DefaultMessageSender implements
MessageSender {
* @param streamId streamId
* @param dt data report timestamp
* @param msgUUID msg uuid
- * @param timeout
- * @param timeUnit
* @param isProxySend true: dataproxy doesn't return response message
until data is sent to MQ
* @return SendResult.OK means success
*/
- public SendResult sendMessage(List<byte[]> bodyList, String groupId,
String streamId, long dt, String msgUUID,
- long timeout, TimeUnit timeUnit, boolean isProxySend) {
+ public SendResult sendMessage(List<byte[]> bodyList,
+ String groupId, String streamId, long dt, String msgUUID, boolean
isProxySend) {
dt = ProxyUtils.covertZeroDt(dt);
if (!ProxyUtils.isBodyValid(bodyList) || !ProxyUtils.isDtValid(dt)) {
return SendResult.INVALID_ATTRIBUTES;
@@ -408,8 +399,7 @@ public class DefaultMessageSender implements MessageSender {
isGroupIdTransfer, dt / 1000,
idGenerator.getNextInt(), groupId, streamId, proxySend);
encodeObject.setSupportLF(isSupportLF);
- Function<Sender, SendResult> sendOperation = (sender) ->
sender.syncSendMessage(encodeObject, msgUUID,
- timeout, timeUnit);
+ Function<Sender, SendResult> sendOperation = (sender) ->
sender.syncSendMessage(encodeObject, msgUUID);
return attemptSendMessage(sendOperation);
} else if (msgtype == 3 || msgtype == 5) {
if (isProxySend) {
@@ -422,14 +412,12 @@ public class DefaultMessageSender implements
MessageSender {
sendOperation = (sender) -> sender.syncSendMessage(new
EncodeObject(bodyList,
"groupId=" + groupId + "&streamId=" + streamId +
"&dt=" + finalDt + "&cp=snappy" + "&cnt="
+ bodyList.size() + finalProxySend,
- idGenerator.getNextId(), this.getMsgtype(),
- true, groupId), msgUUID, timeout, timeUnit);
+ idGenerator.getNextId(), this.getMsgtype(), true,
groupId), msgUUID);
} else {
sendOperation = (sender) -> sender.syncSendMessage(new
EncodeObject(bodyList,
"groupId=" + groupId + "&streamId=" + streamId +
"&dt=" + finalDt + "&cnt=" + bodyList.size()
+ finalProxySend,
- idGenerator.getNextId(), this.getMsgtype(),
- false, groupId), msgUUID, timeout, timeUnit);
+ idGenerator.getNextId(), this.getMsgtype(), false,
groupId), msgUUID);
}
return attemptSendMessage(sendOperation);
}
@@ -437,8 +425,32 @@ public class DefaultMessageSender implements MessageSender
{
}
public SendResult sendMessage(List<byte[]> bodyList, String groupId,
String streamId, long dt,
- String msgUUID, long timeout, TimeUnit timeUnit, Map<String,
String> extraAttrMap) {
- return sendMessage(bodyList, groupId, streamId, dt, msgUUID, timeout,
timeUnit, extraAttrMap, false);
+ String msgUUID, Map<String, String> extraAttrMap) {
+ return sendMessage(bodyList, groupId, streamId, dt, msgUUID,
extraAttrMap, false);
+ }
+
+ @Override
+ public void asyncSendMessage(SendMessageCallback callback, byte[] body,
String groupId, String streamId, long dt,
+ String msgUUID, Map<String, String> extraAttrMap) throws
ProxysdkException {
+
+ }
+
+ @Override
+ public void asyncSendMessage(SendMessageCallback callback, byte[] body,
String groupId, String streamId, long dt,
+ String msgUUID) throws ProxysdkException {
+
+ }
+
+ @Override
+ public void asyncSendMessage(SendMessageCallback callback, List<byte[]>
bodyList, String groupId, String streamId,
+ long dt, String msgUUID) throws ProxysdkException {
+
+ }
+
+ @Override
+ public void asyncSendMessage(SendMessageCallback callback, List<byte[]>
bodyList, String groupId, String streamId,
+ long dt, String msgUUID, Map<String, String> extraAttrMap) throws
ProxysdkException {
+
}
/**
@@ -449,14 +461,12 @@ public class DefaultMessageSender implements
MessageSender {
* @param streamId streamId
* @param dt data report timestamp
* @param msgUUID msg uuid
- * @param timeout
- * @param timeUnit
* @param extraAttrMap extra attributes
* @param isProxySend true: dataproxy doesn't return response message
until data is sent to MQ
* @return SendResult.OK means success
*/
public SendResult sendMessage(List<byte[]> bodyList, String groupId,
String streamId, long dt,
- String msgUUID, long timeout, TimeUnit timeUnit, Map<String,
String> extraAttrMap, boolean isProxySend) {
+ String msgUUID, Map<String, String> extraAttrMap, boolean
isProxySend) {
dt = ProxyUtils.covertZeroDt(dt);
if (!ProxyUtils.isBodyValid(bodyList) || !ProxyUtils.isDtValid(dt) ||
!ProxyUtils.isAttrKeysValid(
extraAttrMap)) {
@@ -476,8 +486,7 @@ public class DefaultMessageSender implements MessageSender {
isGroupIdTransfer, dt / 1000,
idGenerator.getNextInt(), groupId, streamId,
attrs.toString());
encodeObject.setSupportLF(isSupportLF);
- Function<Sender, SendResult> sendOperation = (sender) ->
sender.syncSendMessage(encodeObject, msgUUID,
- timeout, timeUnit);
+ Function<Sender, SendResult> sendOperation = (sender) ->
sender.syncSendMessage(encodeObject, msgUUID);
return attemptSendMessage(sendOperation);
} else if (msgtype == 3 || msgtype == 5) {
attrs.append("&groupId=").append(groupId).append("&streamId=").append(streamId)
@@ -486,13 +495,12 @@ public class DefaultMessageSender implements
MessageSender {
attrs.append("&cp=snappy");
Function<Sender, SendResult> sendOperation =
(sender) -> sender.syncSendMessage(new
EncodeObject(bodyList, attrs.toString(),
- idGenerator.getNextId(), this.getMsgtype(),
true, groupId), msgUUID, timeout, timeUnit);
+ idGenerator.getNextId(), this.getMsgtype(),
true, groupId), msgUUID);
return attemptSendMessage(sendOperation);
} else {
Function<Sender, SendResult> sendOperation =
(sender) -> sender.syncSendMessage(new
EncodeObject(bodyList, attrs.toString(),
- idGenerator.getNextId(), this.getMsgtype(),
false, groupId), msgUUID, timeout,
- timeUnit);
+ idGenerator.getNextId(), this.getMsgtype(),
false, groupId), msgUUID);
return attemptSendMessage(sendOperation);
}
}
diff --git
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/MessageSender.java
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/MessageSender.java
index 1b18096229..e980e65974 100644
---
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/MessageSender.java
+++
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/MessageSender.java
@@ -23,7 +23,6 @@ import
org.apache.inlong.sdk.dataproxy.network.ProxysdkException;
import java.util.List;
import java.util.Map;
-import java.util.concurrent.TimeUnit;
public interface MessageSender {
@@ -33,8 +32,7 @@ public interface MessageSender {
* @param body The data will be sent
*
*/
- SendResult sendMessage(byte[] body, String groupId, String streamId, long
dt, String msgUUID,
- long timeout, TimeUnit timeUnit);
+ SendResult sendMessage(byte[] body, String groupId, String streamId, long
dt, String msgUUID);
/**
* This method provides a synchronized function which you want to send
data without packing
@@ -45,8 +43,8 @@ public interface MessageSender {
* @param extraAttrMap The attributes you want to add,
* and each element of extraAttrMap contains a pair
like attrKey,attrValue
*/
- SendResult sendMessage(byte[] body, String groupId, String streamId, long
dt, String msgUUID,
- long timeout, TimeUnit timeUnit, Map<String, String> extraAttrMap);
+ SendResult sendMessage(byte[] body, String groupId,
+ String streamId, long dt, String msgUUID, Map<String, String>
extraAttrMap);
/**
* This method provides a synchronized function which you want to send
data with packing
@@ -54,8 +52,7 @@ public interface MessageSender {
*
* @param bodyList The data will be sent,which is a collection consisting
of byte arrays
*/
- SendResult sendMessage(List<byte[]> bodyList, String groupId, String
streamId, long dt, String msgUUID,
- long timeout, TimeUnit timeUnit);
+ SendResult sendMessage(List<byte[]> bodyList, String groupId, String
streamId, long dt, String msgUUID);
/**
* This method provides a synchronized function which you want to send
data with packing
@@ -66,8 +63,8 @@ public interface MessageSender {
* @param extraAttrMap The attributes you want to add,
* and each element of extraAttrMap contains a pair
like attrKey,attrValue
*/
- SendResult sendMessage(List<byte[]> bodyList, String groupId, String
streamId, long dt, String msgUUID,
- long timeout, TimeUnit timeUnit, Map<String, String> extraAttrMap);
+ SendResult sendMessage(List<byte[]> bodyList, String groupId,
+ String streamId, long dt, String msgUUID, Map<String, String>
extraAttrMap);
/**
* This method provides a synchronized function which you want to send
data without packing
@@ -80,7 +77,6 @@ public interface MessageSender {
*/
void asyncSendMessage(SendMessageCallback callback,
byte[] body, String groupId, String streamId, long dt, String
msgUUID,
- long timeout, TimeUnit timeUnit,
Map<String, String> extraAttrMap) throws ProxysdkException;
/**
@@ -91,8 +87,7 @@ public interface MessageSender {
* @param body The data will be sent
*/
void asyncSendMessage(SendMessageCallback callback,
- byte[] body, String groupId, String streamId, long dt, String
msgUUID,
- long timeout, TimeUnit timeUnit) throws ProxysdkException;
+ byte[] body, String groupId, String streamId, long dt, String
msgUUID) throws ProxysdkException;
/**
* This method provides an asynchronized function which you want to send
data with packing
@@ -101,8 +96,7 @@ public interface MessageSender {
* @param bodyList The data will be sent,which is a collection consisting
of byte arrays
*/
void asyncSendMessage(SendMessageCallback callback,
- List<byte[]> bodyList, String groupId, String streamId, long dt,
String msgUUID,
- long timeout, TimeUnit timeUnit) throws ProxysdkException;
+ List<byte[]> bodyList, String groupId, String streamId, long dt,
String msgUUID) throws ProxysdkException;
/**
* This method provides an asynchronized function which you want to send
data with packing
@@ -115,7 +109,6 @@ public interface MessageSender {
*/
void asyncSendMessage(SendMessageCallback callback,
List<byte[]> bodyList, String groupId, String streamId, long dt,
String msgUUID,
- long timeout, TimeUnit timeUnit,
Map<String, String> extraAttrMap) throws ProxysdkException;
/**
diff --git
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/ProxyClientConfig.java
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/ProxyClientConfig.java
index c3253805cb..3338d866c2 100644
---
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/ProxyClientConfig.java
+++
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/ProxyClientConfig.java
@@ -59,14 +59,18 @@ public class ProxyClientConfig {
private MetricConfig metricConfig = new MetricConfig();
private int managerConnectionTimeout = 10000;
+ // http socket timeout in milliseconds
+ private int managerSocketTimeout = 30 * 1000;
+
private boolean readProxyIPFromLocal = false;
- /**
- * Default connection, handshake, and initial request timeout in
milliseconds.
- */
- private long connectTimeoutMillis;
- private long requestTimeoutMillis;
- private int managerSocketTimeout = 30 * 1000;
+ // connect timeout in milliseconds
+ private long connectTimeoutMs = ConfigConstants.VAL_DEF_CONNECT_TIMEOUT_MS;
+ // request timeout in milliseconds
+ private long requestTimeoutMs = ConfigConstants.VAL_DEF_REQUEST_TIMEOUT_MS;
+ // connect close wait period in milliseconds
+ private long conCloseWaitPeriodMs =
+ ConfigConstants.VAL_DEF_REQUEST_TIMEOUT_MS +
ConfigConstants.VAL_DEF_CONNECT_CLOSE_DELAY_MS;
// configuration for http client
// whether discard old metric when cache is full.
@@ -117,8 +121,6 @@ public class ProxyClientConfig {
this.proxyUpdateIntervalMinutes =
ConfigConstants.PROXY_UPDATE_INTERVAL_MINUTES;
this.proxyHttpUpdateIntervalMinutes =
ConfigConstants.PROXY_HTTP_UPDATE_INTERVAL_MINUTES;
this.proxyUpdateMaxRetry = ConfigConstants.PROXY_UPDATE_MAX_RETRY;
- this.connectTimeoutMillis =
ConfigConstants.DEFAULT_CONNECT_TIMEOUT_MILLIS;
-
this.setRequestTimeoutMillis(ConfigConstants.DEFAULT_REQUEST_TIMEOUT_MILLIS);
this.authSecretId = authSecretId;
this.authSecretKey = authSecretKey;
this.loadBalance = loadBalance;
@@ -148,8 +150,6 @@ public class ProxyClientConfig {
this.proxyUpdateIntervalMinutes =
ConfigConstants.PROXY_UPDATE_INTERVAL_MINUTES;
this.proxyHttpUpdateIntervalMinutes =
ConfigConstants.PROXY_HTTP_UPDATE_INTERVAL_MINUTES;
this.proxyUpdateMaxRetry = ConfigConstants.PROXY_UPDATE_MAX_RETRY;
- this.connectTimeoutMillis =
ConfigConstants.DEFAULT_CONNECT_TIMEOUT_MILLIS;
-
this.setRequestTimeoutMillis(ConfigConstants.DEFAULT_REQUEST_TIMEOUT_MILLIS);
this.authSecretId = authSecretId;
this.authSecretKey = authSecretKey;
this.loadBalance = loadBalance;
@@ -299,20 +299,36 @@ public class ProxyClientConfig {
this.proxyUpdateMaxRetry = proxyUpdateMaxRetry;
}
- public long getConnectTimeoutMillis() {
- return connectTimeoutMillis;
+ public long getConnectTimeoutMs() {
+ return connectTimeoutMs;
+ }
+
+ public void setConnectTimeoutMs(long connectTimeoutMs) {
+ if (connectTimeoutMs >= ConfigConstants.VAL_MIN_CONNECT_TIMEOUT_MS) {
+ this.connectTimeoutMs = connectTimeoutMs;
+ }
}
- public void setConnectTimeoutMillis(long connectTimeoutMillis) {
- this.connectTimeoutMillis = connectTimeoutMillis;
+ public long getRequestTimeoutMs() {
+ return requestTimeoutMs;
}
- public long getRequestTimeoutMillis() {
- return requestTimeoutMillis;
+ public void setRequestTimeoutMs(long requestTimeoutMs) {
+ if (requestTimeoutMs >= ConfigConstants.VAL_MIN_REQUEST_TIMEOUT_MS) {
+ this.requestTimeoutMs = requestTimeoutMs;
+ this.conCloseWaitPeriodMs =
+ this.requestTimeoutMs +
ConfigConstants.VAL_DEF_CONNECT_CLOSE_DELAY_MS;
+ }
}
- public void setRequestTimeoutMillis(long requestTimeoutMillis) {
- this.requestTimeoutMillis = requestTimeoutMillis;
+ public long getConCloseWaitPeriodMs() {
+ return conCloseWaitPeriodMs;
+ }
+
+ public void setConCloseWaitPeriodMs(long conCloseWaitPeriodMs) {
+ if (conCloseWaitPeriodMs >= 0) {
+ this.conCloseWaitPeriodMs = conCloseWaitPeriodMs;
+ }
}
public String getRsaPubKeyUrl() {
diff --git
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/example/SendMsgThread.java
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/example/SendMsgThread.java
index 4c51696be0..4658bb1a05 100644
---
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/example/SendMsgThread.java
+++
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/example/SendMsgThread.java
@@ -26,7 +26,6 @@ import org.slf4j.LoggerFactory;
import java.io.BufferedReader;
import java.io.FileReader;
import java.io.IOException;
-import java.util.concurrent.TimeUnit;
public class SendMsgThread extends Thread {
@@ -49,8 +48,7 @@ public class SendMsgThread extends Thread {
long startTime = System.currentTimeMillis();
SendResult result =
messageSender.sendMessage("hhhh".getBytes("utf8"),
- "b_test", "n_test1", 0,
String.valueOf(System.currentTimeMillis()), 1,
- TimeUnit.MILLISECONDS);
+ "b_test", "n_test1", 0,
String.valueOf(System.currentTimeMillis()));
long endTime = System.currentTimeMillis();
if (result == result.OK) {
logger.info("this msg is ok time {}", endTime - startTime);
diff --git
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/example/TcpClientExample.java
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/example/TcpClientExample.java
index 55b6cf6d99..eda90bdbca 100644
---
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/example/TcpClientExample.java
+++
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/example/TcpClientExample.java
@@ -27,7 +27,6 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.UnsupportedEncodingException;
-import java.util.concurrent.TimeUnit;
public class TcpClientExample {
@@ -75,6 +74,7 @@ public class TcpClientExample {
}
dataProxyConfig.setReadProxyIPFromLocal(isReadProxyIPFromLocal);
dataProxyConfig.setProtocolType(ProtocolType.TCP);
+ dataProxyConfig.setRequestTimeoutMs(20000L);
messageSender =
DefaultMessageSender.generateSenderByClusterId(dataProxyConfig);
messageSender.setMsgtype(msgType);
} catch (Exception e) {
@@ -88,7 +88,7 @@ public class TcpClientExample {
SendResult result = null;
try {
result = sender.sendMessage(messageBody.getBytes("utf8"),
inlongGroupId, inlongStreamId,
- 0, String.valueOf(dt), 20, TimeUnit.SECONDS);
+ 0, String.valueOf(dt));
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
diff --git
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/NettyClient.java
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/NettyClient.java
index cd38e5f895..f529730384 100644
---
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/NettyClient.java
+++
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/NettyClient.java
@@ -89,7 +89,7 @@ public class NettyClient {
try {
// Wait until the connection is built.
- awaitLatch.await(configure.getConnectTimeoutMillis(),
+ awaitLatch.await(configure.getConnectTimeoutMs(),
TimeUnit.MILLISECONDS);
} catch (Exception e) {
logger.error("create connect exception! {}", e.getMessage());
@@ -123,8 +123,7 @@ public class NettyClient {
}
});
// Wait until the connection is close.
- awaitLatch.await(configure.getRequestTimeoutMillis(),
- TimeUnit.MILLISECONDS);
+ awaitLatch.await(configure.getConCloseWaitPeriodMs(),
TimeUnit.MILLISECONDS);
// Return if close this connection fail.
if (!future.isSuccess()) {
ret = false;
diff --git
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/Sender.java
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/Sender.java
index 50b3105b56..01ac56a53b 100644
---
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/Sender.java
+++
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/Sender.java
@@ -173,8 +173,9 @@ public class Sender {
currentBufferSize.decrementAndGet();
}
- private SendResult syncSendInternalMessage(NettyClient client,
EncodeObject encodeObject, String msgUUID,
- long timeout, TimeUnit timeUnit) throws ExecutionException,
InterruptedException, TimeoutException {
+ private SendResult syncSendInternalMessage(NettyClient client,
+ EncodeObject encodeObject, String msgUUID)
+ throws ExecutionException, InterruptedException, TimeoutException {
if (client == null) {
return SendResult.NO_CONNECTION;
}
@@ -204,11 +205,12 @@ public class Sender {
encodeObject.setEncryptEntry(false, null, null);
}
encodeObject.setMsgUUID(msgUUID);
- SyncMessageCallable callable = new SyncMessageCallable(client,
encodeObject, timeout, timeUnit);
+ SyncMessageCallable callable = new SyncMessageCallable(client,
encodeObject,
+ configure.getRequestTimeoutMs(), TimeUnit.MILLISECONDS);
syncCallables.put(encodeObject.getMessageId(), callable);
Future<SendResult> future = threadPool.submit(callable);
- return future.get(timeout, timeUnit);
+ return future.get(configure.getRequestTimeoutMs(),
TimeUnit.MILLISECONDS);
}
/**
@@ -217,11 +219,9 @@ public class Sender {
*
* @param encodeObject
* @param msgUUID
- * @param timeout
- * @param timeUnit
* @return
*/
- public SendResult syncSendMessage(EncodeObject encodeObject, String
msgUUID, long timeout, TimeUnit timeUnit) {
+ public SendResult syncSendMessage(EncodeObject encodeObject, String
msgUUID) {
if (configure.isEnableMetric()) {
metricWorker.recordNumByKey(encodeObject.getMessageId(),
encodeObject.getGroupId(),
encodeObject.getStreamId(), Utils.getLocalIp(),
encodeObject.getDt(),
@@ -230,7 +230,7 @@ public class Sender {
NettyClient client = clientMgr.getClient(clientMgr.getLoadBalance(),
encodeObject);
SendResult message = null;
try {
- message = syncSendInternalMessage(client, encodeObject, msgUUID,
timeout, timeUnit);
+ message = syncSendInternalMessage(client, encodeObject, msgUUID);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
LOGGER.error("send message error {} ", getExceptionStack(e));
diff --git
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/pb/PbProtocolMessageSender.java
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/pb/PbProtocolMessageSender.java
index f0bd45d13e..fc584f5a5a 100644
---
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/pb/PbProtocolMessageSender.java
+++
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/pb/PbProtocolMessageSender.java
@@ -46,13 +46,16 @@ import java.util.concurrent.atomic.AtomicReference;
public class PbProtocolMessageSender implements MessageSender, Configurable {
public static final Logger LOG =
LoggerFactory.getLogger(PbProtocolMessageSender.class);
-
+ private static final String KEY_REQUEST_TIMEOUT_MS = "request.timeout.ms";
+ private static final long VAL_DEF_REQUEST_TIMEOUT_MS = 20000L;
+ private static final long VAL_MIN_REQUEST_TIMEOUT_MS = 1L;
private String name;
private String localIp;
private LifecycleState lifecycleState;
private Context context;
private BufferQueueChannel channel;
private ProxySdkSink sink;
+ private long requestTimeoutMs = VAL_DEF_REQUEST_TIMEOUT_MS;
/**
* Constructor
@@ -101,6 +104,10 @@ public class PbProtocolMessageSender implements
MessageSender, Configurable {
this.sink.setName(name + "-sink");
this.sink.configure(context);
this.sink.setChannel(channel);
+ long tmpTimeoutMs = context.getLong(KEY_REQUEST_TIMEOUT_MS,
VAL_DEF_REQUEST_TIMEOUT_MS);
+ if (tmpTimeoutMs >= VAL_MIN_REQUEST_TIMEOUT_MS) {
+ this.requestTimeoutMs = tmpTimeoutMs;
+ }
}
/**
@@ -179,14 +186,11 @@ public class PbProtocolMessageSender implements
MessageSender, Configurable {
* @param streamId
* @param dt
* @param msgUUID
- * @param timeout
- * @param timeUnit
* @return SendResult
*/
@Override
- public SendResult sendMessage(byte[] body, String groupId, String
streamId, long dt, String msgUUID, long timeout,
- TimeUnit timeUnit) {
- return this.sendMessage(body, groupId, streamId, dt, msgUUID, timeout,
timeUnit, null);
+ public SendResult sendMessage(byte[] body, String groupId, String
streamId, long dt, String msgUUID) {
+ return this.sendMessage(body, groupId, streamId, dt, msgUUID, null);
}
/**
@@ -197,14 +201,12 @@ public class PbProtocolMessageSender implements
MessageSender, Configurable {
* @param streamId
* @param dt
* @param msgUUID
- * @param timeout
- * @param timeUnit
* @param extraAttrMap
* @return SendResult
*/
@Override
- public SendResult sendMessage(byte[] body, String groupId, String
streamId, long dt, String msgUUID, long timeout,
- TimeUnit timeUnit, Map<String, String> extraAttrMap) {
+ public SendResult sendMessage(byte[] body, String groupId,
+ String streamId, long dt, String msgUUID, Map<String, String>
extraAttrMap) {
// prepare
SdkEvent sdkEvent = new SdkEvent();
sdkEvent.setInlongGroupId(groupId);
@@ -237,7 +239,7 @@ public class PbProtocolMessageSender implements
MessageSender, Configurable {
this.put(profile);
// wait
try {
- boolean success = latch.await(timeout, timeUnit);
+ boolean success = latch.await(requestTimeoutMs,
TimeUnit.MILLISECONDS);
if (!success) {
refResult.set(SendResult.TIMEOUT);
}
@@ -256,14 +258,11 @@ public class PbProtocolMessageSender implements
MessageSender, Configurable {
* @param streamId
* @param dt
* @param msgUUID
- * @param timeout
- * @param timeUnit
* @return SendResult
*/
@Override
- public SendResult sendMessage(List<byte[]> bodyList, String groupId,
String streamId, long dt, String msgUUID,
- long timeout, TimeUnit timeUnit) {
- return this.sendMessage(bodyList, groupId, streamId, dt, msgUUID,
timeout, timeUnit, null);
+ public SendResult sendMessage(List<byte[]> bodyList, String groupId,
String streamId, long dt, String msgUUID) {
+ return this.sendMessage(bodyList, groupId, streamId, dt, msgUUID,
null);
}
/**
@@ -274,14 +273,12 @@ public class PbProtocolMessageSender implements
MessageSender, Configurable {
* @param streamId
* @param dt
* @param msgUUID
- * @param timeout
- * @param timeUnit
* @param extraAttrMap
* @return SendResult
*/
@Override
- public SendResult sendMessage(List<byte[]> bodyList, String groupId,
String streamId, long dt, String msgUUID,
- long timeout, TimeUnit timeUnit, Map<String, String> extraAttrMap)
{
+ public SendResult sendMessage(List<byte[]> bodyList,
+ String groupId, String streamId, long dt, String msgUUID,
Map<String, String> extraAttrMap) {
final AtomicReference<SendResult> refResult = new AtomicReference<>();
CountDownLatch latch = new CountDownLatch(bodyList.size());
// prepare
@@ -318,7 +315,7 @@ public class PbProtocolMessageSender implements
MessageSender, Configurable {
this.putAll(events);
// wait
try {
- boolean success = latch.await(timeout, timeUnit);
+ boolean success = latch.await(requestTimeoutMs,
TimeUnit.MILLISECONDS);
if (!success) {
refResult.set(SendResult.TIMEOUT);
}
@@ -338,14 +335,12 @@ public class PbProtocolMessageSender implements
MessageSender, Configurable {
* @param streamId
* @param dt
* @param msgUUID
- * @param timeout
- * @param timeUnit
* @param extraAttrMap
* @throws ProxysdkException
*/
@Override
public void asyncSendMessage(SendMessageCallback callback, byte[] body,
String groupId, String streamId, long dt,
- String msgUUID, long timeout, TimeUnit timeUnit, Map<String,
String> extraAttrMap)
+ String msgUUID, Map<String, String> extraAttrMap)
throws ProxysdkException {
SdkEvent sdkEvent = new SdkEvent();
sdkEvent.setInlongGroupId(groupId);
@@ -369,14 +364,12 @@ public class PbProtocolMessageSender implements
MessageSender, Configurable {
* @param streamId
* @param dt
* @param msgUUID
- * @param timeout
- * @param timeUnit
* @throws ProxysdkException
*/
@Override
- public void asyncSendMessage(SendMessageCallback callback, byte[] body,
String groupId, String streamId, long dt,
- String msgUUID, long timeout, TimeUnit timeUnit) throws
ProxysdkException {
- this.asyncSendMessage(callback, body, groupId, streamId, dt, msgUUID,
timeout, timeUnit, null);
+ public void asyncSendMessage(SendMessageCallback callback, byte[] body,
+ String groupId, String streamId, long dt, String msgUUID) throws
ProxysdkException {
+ this.asyncSendMessage(callback, body, groupId, streamId, dt, msgUUID,
null);
}
/**
@@ -388,14 +381,12 @@ public class PbProtocolMessageSender implements
MessageSender, Configurable {
* @param streamId
* @param dt
* @param msgUUID
- * @param timeout
- * @param timeUnit
* @throws ProxysdkException
*/
@Override
- public void asyncSendMessage(SendMessageCallback callback, List<byte[]>
bodyList, String groupId, String streamId,
- long dt, String msgUUID, long timeout, TimeUnit timeUnit) throws
ProxysdkException {
- this.asyncSendMessage(callback, bodyList, groupId, streamId, dt,
msgUUID, timeout, timeUnit, null);
+ public void asyncSendMessage(SendMessageCallback callback, List<byte[]>
bodyList,
+ String groupId, String streamId, long dt, String msgUUID) throws
ProxysdkException {
+ this.asyncSendMessage(callback, bodyList, groupId, streamId, dt,
msgUUID, null);
}
/**
@@ -407,15 +398,13 @@ public class PbProtocolMessageSender implements
MessageSender, Configurable {
* @param streamId
* @param dt
* @param msgUUID
- * @param timeout
- * @param timeUnit
* @param extraAttrMap
* @throws ProxysdkException
*/
@Override
- public void asyncSendMessage(SendMessageCallback callback, List<byte[]>
bodyList, String groupId, String streamId,
- long dt, String msgUUID, long timeout, TimeUnit timeUnit,
Map<String, String> extraAttrMap)
- throws ProxysdkException {
+ public void asyncSendMessage(SendMessageCallback callback,
+ List<byte[]> bodyList, String groupId, String streamId, long dt,
String msgUUID,
+ Map<String, String> extraAttrMap) throws ProxysdkException {
List<CallbackProfile> events = new ArrayList<>(bodyList.size());
for (byte[] body : bodyList) {
SdkEvent sdkEvent = new SdkEvent();
@@ -445,8 +434,7 @@ public class PbProtocolMessageSender implements
MessageSender, Configurable {
@Override
public void asyncSendMessage(String inlongGroupId, String inlongStreamId,
byte[] body, SendMessageCallback callback)
throws ProxysdkException {
- this.asyncSendMessage(callback, body, inlongGroupId, inlongStreamId,
System.currentTimeMillis(), null, 0L, null,
- null);
+ this.asyncSendMessage(callback, body, inlongGroupId, inlongStreamId,
System.currentTimeMillis(), null, null);
}
/**
@@ -461,8 +449,8 @@ public class PbProtocolMessageSender implements
MessageSender, Configurable {
@Override
public void asyncSendMessage(String inlongGroupId, String inlongStreamId,
List<byte[]> bodyList,
SendMessageCallback callback) throws ProxysdkException {
- this.asyncSendMessage(callback, bodyList, inlongGroupId,
inlongStreamId, System.currentTimeMillis(), null, 0L,
- null, null);
+ this.asyncSendMessage(callback, bodyList, inlongGroupId,
inlongStreamId, System.currentTimeMillis(), null,
+ null);
}
}