This is an automated email from the ASF dual-hosted git repository.
aloyszhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 00bb1d2e2d [INLONG-10464][SDK] InlongSDK support retry sending when
failed (#11144)
00bb1d2e2d is described below
commit 00bb1d2e2dbb375c63e3c0b96e3c190e27dbf6f6
Author: emptyOVO <[email protected]>
AuthorDate: Wed Oct 9 12:46:00 2024 +0800
[INLONG-10464][SDK] InlongSDK support retry sending when failed (#11144)
---
.../inlong/sdk/dataproxy/ConfigConstants.java | 2 +
.../inlong/sdk/dataproxy/DefaultMessageSender.java | 126 ++++++++++++++++-----
.../inlong/sdk/dataproxy/ProxyClientConfig.java | 8 ++
3 files changed, 105 insertions(+), 31 deletions(-)
diff --git
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/ConfigConstants.java
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/ConfigConstants.java
index d7bd34f948..45b7b4056d 100644
---
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/ConfigConstants.java
+++
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/ConfigConstants.java
@@ -74,4 +74,6 @@ public class ConfigConstants {
public static String HTTP = "http://";
public static String HTTPS = "https://";
+ public static int DEFAULT_SENDER_MAX_ATTEMPT = 1;
+
}
diff --git
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/DefaultMessageSender.java
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/DefaultMessageSender.java
index 7b655b29fa..8a833a69cb 100644
---
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/DefaultMessageSender.java
+++
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/DefaultMessageSender.java
@@ -43,6 +43,7 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.Function;
public class DefaultMessageSender implements MessageSender {
@@ -65,6 +66,7 @@ public class DefaultMessageSender implements MessageSender {
private boolean isReport = false;
private boolean isSupportLF = false;
private int cpsSize = ConfigConstants.COMPRESS_SIZE;
+ private final int senderMaxAttempt;
public DefaultMessageSender(ProxyClientConfig configure) throws Exception {
this(configure, null);
@@ -75,6 +77,7 @@ public class DefaultMessageSender implements MessageSender {
sender = new Sender(configure, selfDefineFactory);
groupId = configure.getInlongGroupId();
indexCol = new IndexCollectThread(storeIndex);
+ senderMaxAttempt = configure.getSenderMaxAttempt();
indexCol.start();
}
@@ -191,11 +194,39 @@ public class DefaultMessageSender implements
MessageSender {
return ConfigConstants.PROXY_SDK_VERSION;
}
+ private SendResult attemptSendMessage(Function<Sender, SendResult>
sendOperation) {
+ int attempts = 0;
+ SendResult sendResult = null;
+ while (attempts < this.senderMaxAttempt) {
+ sendResult = sendOperation.apply(sender);
+ if (sendResult != null && sendResult.equals(SendResult.OK)) {
+ return sendResult;
+ }
+ attempts++;
+ }
+ return sendResult;
+ }
+
+ private String attemptSendMessageIndex(Function<Sender, String>
sendOperation) {
+ int attempts = 0;
+ String sendIndexResult = null;
+ while (attempts < this.senderMaxAttempt) {
+ sendIndexResult = sendOperation.apply(sender);
+ if (sendIndexResult != null &&
sendIndexResult.startsWith(SendResult.OK.toString())) {
+ return sendIndexResult;
+ }
+ attempts++;
+ }
+ return sendIndexResult;
+ }
+
@Deprecated
public SendResult sendMessage(byte[] body, String attributes, String
msgUUID,
long timeout, TimeUnit timeUnit) {
- return sender.syncSendMessage(new EncodeObject(body, attributes,
- idGenerator.getNextId()), msgUUID, timeout, timeUnit);
+ Function<Sender, SendResult> sendOperation =
+ (sender) -> sender.syncSendMessage(new EncodeObject(body,
attributes, idGenerator.getNextId()), msgUUID,
+ timeout, timeUnit);
+ return attemptSendMessage(sendOperation);
}
public SendResult sendMessage(byte[] body, String groupId, String
streamId, long dt, String msgUUID,
@@ -235,20 +266,31 @@ public class DefaultMessageSender implements
MessageSender {
EncodeObject encodeObject = new EncodeObject(body, msgtype,
isCompressEnd, isReport,
isGroupIdTransfer, dt / 1000, idGenerator.getNextInt(),
groupId, streamId, proxySend);
encodeObject.setSupportLF(isSupportLF);
- return sender.syncSendMessage(encodeObject, msgUUID, timeout,
timeUnit);
+ Function<Sender, SendResult> sendOperation = (sender) ->
sender.syncSendMessage(encodeObject, msgUUID,
+ timeout, timeUnit);
+ return attemptSendMessage(sendOperation);
} else if (msgtype == 3 || msgtype == 5) {
if (isProxySend) {
proxySend = "&" + proxySend;
}
+ final String finalProxySend = proxySend;
+ final long finalDt = dt;
+ Function<Sender, SendResult> sendOperation;
if (isCompressEnd) {
- return sender.syncSendMessage(new EncodeObject(body,
"groupId=" + groupId + "&streamId="
- + streamId + "&dt=" + dt + "&cp=snappy" + proxySend,
idGenerator.getNextId(),
- this.getMsgtype(), true, groupId), msgUUID, timeout,
timeUnit);
+ sendOperation = (sender) -> sender.syncSendMessage(new
EncodeObject(body,
+ "groupId=" + groupId + "&streamId=" + streamId +
"&dt=" + finalDt + "&cp=snappy"
+ + finalProxySend,
+ idGenerator.getNextId(), this.getMsgtype(),
+ true, groupId), msgUUID, timeout, timeUnit);
} else {
- return sender.syncSendMessage(new EncodeObject(body,
- "groupId=" + groupId + "&streamId=" + streamId +
"&dt=" + dt + proxySend,
- idGenerator.getNextId(), this.getMsgtype(), false,
groupId), msgUUID, timeout, timeUnit);
+ sendOperation = (sender) -> sender.syncSendMessage(new
EncodeObject(body,
+ "groupId=" + groupId + "&streamId=" + streamId +
"&dt=" + finalDt
+ + finalProxySend,
+ idGenerator.getNextId(), this.getMsgtype(),
+ false, groupId), msgUUID, timeout, timeUnit);
+
}
+ return attemptSendMessage(sendOperation);
}
return null;
@@ -294,18 +336,22 @@ public class DefaultMessageSender implements
MessageSender {
isGroupIdTransfer, dt / 1000,
idGenerator.getNextInt(), groupId, streamId,
attrs.toString());
encodeObject.setSupportLF(isSupportLF);
- return sender.syncSendMessage(encodeObject, msgUUID, timeout,
timeUnit);
+ Function<Sender, SendResult> sendOperation = (sender) ->
sender.syncSendMessage(encodeObject, msgUUID,
+ timeout, timeUnit);
+ return attemptSendMessage(sendOperation);
} else if (msgtype == 3 || msgtype == 5) {
attrs.append("&groupId=").append(groupId).append("&streamId=").append(streamId).append("&dt=").append(dt);
if (isCompressEnd) {
attrs.append("&cp=snappy");
- return sender.syncSendMessage(new EncodeObject(body,
attrs.toString(),
- idGenerator.getNextId(), this.getMsgtype(), true,
groupId),
+ Function<Sender, SendResult> sendOperation = (sender) ->
sender.syncSendMessage(new EncodeObject(body,
+ attrs.toString(), idGenerator.getNextId(),
this.getMsgtype(), true, groupId),
msgUUID, timeout, timeUnit);
+ return attemptSendMessage(sendOperation);
} else {
- return sender.syncSendMessage(new EncodeObject(body,
attrs.toString(),
- idGenerator.getNextId(), this.getMsgtype(), false,
groupId), msgUUID,
- timeout, timeUnit);
+ Function<Sender, SendResult> sendOperation = (sender) ->
sender.syncSendMessage(new EncodeObject(body,
+ attrs.toString(), idGenerator.getNextId(),
this.getMsgtype(), false, groupId),
+ msgUUID, timeout, timeUnit);
+ return attemptSendMessage(sendOperation);
}
}
return null;
@@ -348,21 +394,30 @@ public class DefaultMessageSender implements
MessageSender {
isGroupIdTransfer, dt / 1000,
idGenerator.getNextInt(), groupId, streamId, proxySend);
encodeObject.setSupportLF(isSupportLF);
- return sender.syncSendMessage(encodeObject, msgUUID, timeout,
timeUnit);
+ Function<Sender, SendResult> sendOperation = (sender) ->
sender.syncSendMessage(encodeObject, msgUUID,
+ timeout, timeUnit);
+ return attemptSendMessage(sendOperation);
} else if (msgtype == 3 || msgtype == 5) {
if (isProxySend) {
proxySend = "&" + proxySend;
}
+ final long finalDt = dt;
+ final String finalProxySend = proxySend;
+ Function<Sender, SendResult> sendOperation;
if (isCompress) {
- return sender.syncSendMessage(new EncodeObject(bodyList,
"groupId=" + groupId + "&streamId=" + streamId
- + "&dt=" + dt + "&cp=snappy" + "&cnt=" +
bodyList.size() + proxySend,
- idGenerator.getNextId(), this.getMsgtype(), true,
groupId), msgUUID, timeout, timeUnit);
+ sendOperation = (sender) -> sender.syncSendMessage(new
EncodeObject(bodyList,
+ "groupId=" + groupId + "&streamId=" + streamId +
"&dt=" + finalDt + "&cp=snappy" + "&cnt="
+ + bodyList.size() + finalProxySend,
+ idGenerator.getNextId(), this.getMsgtype(),
+ true, groupId), msgUUID, timeout, timeUnit);
} else {
- return sender.syncSendMessage(new EncodeObject(bodyList,
"groupId=" + groupId + "&streamId=" + streamId
- + "&dt=" + dt + "&cnt=" + bodyList.size() + proxySend,
idGenerator.getNextId(),
- this.getMsgtype(),
+ sendOperation = (sender) -> sender.syncSendMessage(new
EncodeObject(bodyList,
+ "groupId=" + groupId + "&streamId=" + streamId +
"&dt=" + finalDt + "&cnt=" + bodyList.size()
+ + finalProxySend,
+ idGenerator.getNextId(), this.getMsgtype(),
false, groupId), msgUUID, timeout, timeUnit);
}
+ return attemptSendMessage(sendOperation);
}
return null;
}
@@ -404,19 +459,24 @@ public class DefaultMessageSender implements
MessageSender {
isGroupIdTransfer, dt / 1000,
idGenerator.getNextInt(), groupId, streamId,
attrs.toString());
encodeObject.setSupportLF(isSupportLF);
- return sender.syncSendMessage(encodeObject, msgUUID, timeout,
timeUnit);
+ Function<Sender, SendResult> sendOperation = (sender) ->
sender.syncSendMessage(encodeObject, msgUUID,
+ timeout, timeUnit);
+ return attemptSendMessage(sendOperation);
} else if (msgtype == 3 || msgtype == 5) {
attrs.append("&groupId=").append(groupId).append("&streamId=").append(streamId)
.append("&dt=").append(dt).append("&cnt=").append(bodyList.size());
if (isCompress) {
attrs.append("&cp=snappy");
- return sender.syncSendMessage(new EncodeObject(bodyList,
attrs.toString(),
- idGenerator.getNextId(), this.getMsgtype(), true,
groupId),
- msgUUID, timeout, timeUnit);
+ Function<Sender, SendResult> sendOperation =
+ (sender) -> sender.syncSendMessage(new
EncodeObject(bodyList, attrs.toString(),
+ idGenerator.getNextId(), this.getMsgtype(),
true, groupId), msgUUID, timeout, timeUnit);
+ return attemptSendMessage(sendOperation);
} else {
- return sender.syncSendMessage(new EncodeObject(bodyList,
attrs.toString(),
- idGenerator.getNextId(), this.getMsgtype(), false,
groupId),
- msgUUID, timeout, timeUnit);
+ Function<Sender, SendResult> sendOperation =
+ (sender) -> sender.syncSendMessage(new
EncodeObject(bodyList, attrs.toString(),
+ idGenerator.getNextId(), this.getMsgtype(),
false, groupId), msgUUID, timeout,
+ timeUnit);
+ return attemptSendMessage(sendOperation);
}
}
return null;
@@ -807,7 +867,9 @@ public class DefaultMessageSender implements MessageSender {
isReport, isGroupIdTransfer, dt / 1000,
sid, groupId, streamId, attrs.toString(), "data", "");
encodeObject.setSupportLF(isSupportLF);
- return sender.syncSendMessageIndex(encodeObject, msgUUID, timeout,
timeUnit);
+ Function<Sender, String> sendOperation = (sender) ->
sender.syncSendMessageIndex(encodeObject, msgUUID,
+ timeout, timeUnit);
+ return attemptSendMessageIndex(sendOperation);
}
return null;
}
@@ -822,7 +884,9 @@ public class DefaultMessageSender implements MessageSender {
if (msgtype == 7 || msgtype == 8) {
EncodeObject encodeObject = new EncodeObject(body, msgtype, false,
isReport,
isGroupIdTransfer, dt / 1000, sid, groupId, streamId, "",
messageKey, ip);
- return sender.syncSendMessageIndex(encodeObject, msgUUID, timeout,
timeUnit);
+ Function<Sender, String> sendOperation = (sender) ->
sender.syncSendMessageIndex(encodeObject, msgUUID,
+ timeout, timeUnit);
+ return attemptSendMessageIndex(sendOperation);
}
return null;
}
diff --git
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/ProxyClientConfig.java
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/ProxyClientConfig.java
index 05b439d7f1..f866b4b76d 100644
---
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/ProxyClientConfig.java
+++
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/ProxyClientConfig.java
@@ -100,6 +100,7 @@ public class ProxyClientConfig {
private LoadBalance loadBalance;
private int maxRetry;
+ private int senderMaxAttempt = ConfigConstants.DEFAULT_SENDER_MAX_ATTEMPT;
/* pay attention to the last url parameter ip */
public ProxyClientConfig(String localHost, boolean requestByHttp, String
managerIp,
@@ -548,4 +549,11 @@ public class ProxyClientConfig {
public void setMaxRetry(int maxRetry) {
this.maxRetry = maxRetry;
}
+ public int getSenderMaxAttempt() {
+ return senderMaxAttempt;
+ }
+
+ public void setSenderAttempt(int senderMaxAttempt) {
+ this.senderMaxAttempt = senderMaxAttempt;
+ }
}