This is an automated email from the ASF dual-hosted git repository.
gosonzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new d6cd5ef93b [INLONG-11719][SDK] Replace the Sender object in the
InlongSdkDirtySender class with TcpMsgSender (#11724)
d6cd5ef93b is described below
commit d6cd5ef93b5b989718bcc117829515b7c943d00e
Author: Goson Zhang <[email protected]>
AuthorDate: Fri Feb 7 19:26:19 2025 +0800
[INLONG-11719][SDK] Replace the Sender object in the InlongSdkDirtySender
class with TcpMsgSender (#11724)
Co-authored-by: gosonzhang <[email protected]>
---
.../inlong/sdk/dirtydata/InlongSdkDirtySender.java | 43 ++++++++++++----------
1 file changed, 23 insertions(+), 20 deletions(-)
diff --git
a/inlong-sdk/dirty-data-sdk/src/main/java/org/apache/inlong/sdk/dirtydata/InlongSdkDirtySender.java
b/inlong-sdk/dirty-data-sdk/src/main/java/org/apache/inlong/sdk/dirtydata/InlongSdkDirtySender.java
index af7a62fb6d..195df99d9c 100644
---
a/inlong-sdk/dirty-data-sdk/src/main/java/org/apache/inlong/sdk/dirtydata/InlongSdkDirtySender.java
+++
b/inlong-sdk/dirty-data-sdk/src/main/java/org/apache/inlong/sdk/dirtydata/InlongSdkDirtySender.java
@@ -17,9 +17,11 @@
package org.apache.inlong.sdk.dirtydata;
-import org.apache.inlong.sdk.dataproxy.DefaultMessageSender;
-import org.apache.inlong.sdk.dataproxy.common.SendMessageCallback;
-import org.apache.inlong.sdk.dataproxy.common.SendResult;
+import org.apache.inlong.sdk.dataproxy.MsgSenderSingleFactory;
+import org.apache.inlong.sdk.dataproxy.common.ProcessResult;
+import org.apache.inlong.sdk.dataproxy.sender.MsgSendCallback;
+import org.apache.inlong.sdk.dataproxy.sender.tcp.TcpEventInfo;
+import org.apache.inlong.sdk.dataproxy.sender.tcp.TcpMsgSender;
import org.apache.inlong.sdk.dataproxy.sender.tcp.TcpMsgSenderConfig;
import com.google.common.base.Preconditions;
@@ -48,7 +50,8 @@ public class InlongSdkDirtySender {
private boolean closed = false;
private LinkedBlockingQueue<DirtyMessageWrapper> dirtyDataQueue;
- private DefaultMessageSender sender;
+ private TcpMsgSender sender;
+ private MsgSenderSingleFactory messageSenderFactory;
private Executor executor;
public void init() throws Exception {
@@ -57,14 +60,13 @@ public class InlongSdkDirtySender {
Preconditions.checkNotNull(inlongManagerAddr, "inlongManagerAddr
cannot be null");
Preconditions.checkNotNull(authId, "authId cannot be null");
Preconditions.checkNotNull(authKey, "authKey cannot be null");
-
+ // build sender configure
TcpMsgSenderConfig proxyClientConfig =
- new TcpMsgSenderConfig(true,
+ new TcpMsgSenderConfig(false,
inlongManagerAddr, inlongManagerPort, inlongGroupId,
authId, authKey);
- proxyClientConfig.setOnlyUseLocalProxyConfig(false);
- proxyClientConfig.setTotalAsyncCallbackSize(maxCallbackSize);
- this.sender =
DefaultMessageSender.generateSenderByClusterId(proxyClientConfig);
-
+ // build sender factory
+ this.messageSenderFactory = new MsgSenderSingleFactory();
+ this.sender =
this.messageSenderFactory.genTcpSenderByClusterId(proxyClientConfig);
this.dirtyDataQueue = new LinkedBlockingQueue<>(maxCallbackSize);
this.executor = Executors.newSingleThreadExecutor();
executor.execute(this::doSendDirtyMessage);
@@ -80,6 +82,7 @@ public class InlongSdkDirtySender {
}
private void doSendDirtyMessage() {
+ ProcessResult procResult = new ProcessResult();
while (!closed) {
try {
DirtyMessageWrapper messageWrapper = dirtyDataQueue.poll();
@@ -93,30 +96,30 @@ public class InlongSdkDirtySender {
messageWrapper);
continue;
}
-
- sender.asyncSendMessage(inlongGroupId, inlongStreamId,
- messageWrapper.format().getBytes(), new
LogCallBack(messageWrapper));
-
+ if (!sender.asyncSendMessage(new TcpEventInfo(inlongGroupId,
inlongStreamId,
+ System.currentTimeMillis(), null,
messageWrapper.format().getBytes()),
+ new LogCallBack(messageWrapper), procResult)) {
+ dirtyDataQueue.offer(messageWrapper);
+ }
} catch (Throwable t) {
log.error("failed to send inlong dirty message", t);
if (!ignoreErrors) {
throw new RuntimeException("writing dirty message to
inlong sdk failed", t);
}
}
-
}
}
public void close() {
closed = true;
dirtyDataQueue.clear();
- if (sender != null) {
- sender.close();
+ if (messageSenderFactory != null) {
+ messageSenderFactory.shutdownAll();
}
}
@Getter
- class LogCallBack implements SendMessageCallback {
+ class LogCallBack implements MsgSendCallback {
private final DirtyMessageWrapper wrapper;
@@ -125,8 +128,8 @@ public class InlongSdkDirtySender {
}
@Override
- public void onMessageAck(SendResult result) {
- if (SendResult.OK != result) {
+ public void onMessageAck(ProcessResult result) {
+ if (!result.isSuccess()) {
dirtyDataQueue.offer(wrapper);
}
}