This is an automated email from the ASF dual-hosted git repository.
vernedeng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 031732a09f [INLONG-11546][SDK] Support async and sync report dirty
data (#11547)
031732a09f is described below
commit 031732a09f80e2f7c2c42214a247be0981b4c8ea
Author: vernedeng <[email protected]>
AuthorDate: Tue Nov 26 20:01:32 2024 +0800
[INLONG-11546][SDK] Support async and sync report dirty data (#11547)
* [INLONG-11546][SDK] Support async and sync report dirty data
---
.../java/org/apache/inlong/sdk/dirtydata/InlongSdkDirtySender.java | 7 +++++--
.../apache/inlong/sort/base/dirty/sink/sdk/InlongSdkDirtySink.java | 2 +-
2 files changed, 6 insertions(+), 3 deletions(-)
diff --git
a/inlong-sdk/dirty-data-sdk/src/main/java/org/apache/inlong/sdk/dirtydata/InlongSdkDirtySender.java
b/inlong-sdk/dirty-data-sdk/src/main/java/org/apache/inlong/sdk/dirtydata/InlongSdkDirtySender.java
index 80cc596c26..74cfcffa21 100644
---
a/inlong-sdk/dirty-data-sdk/src/main/java/org/apache/inlong/sdk/dirtydata/InlongSdkDirtySender.java
+++
b/inlong-sdk/dirty-data-sdk/src/main/java/org/apache/inlong/sdk/dirtydata/InlongSdkDirtySender.java
@@ -31,7 +31,6 @@ import java.net.InetAddress;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.TimeUnit;
@Slf4j
@Builder
@@ -75,7 +74,11 @@ public class InlongSdkDirtySender {
}
public void sendDirtyMessage(DirtyMessageWrapper messageWrapper) throws
InterruptedException {
- dirtyDataQueue.offer(messageWrapper, 10, TimeUnit.SECONDS);
+ dirtyDataQueue.put(messageWrapper);
+ }
+
+ public boolean sendDirtyMessageAsync(DirtyMessageWrapper messageWrapper) {
+ return dirtyDataQueue.offer(messageWrapper);
}
private void doSendDirtyMessage() {
diff --git
a/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/dirty/sink/sdk/InlongSdkDirtySink.java
b/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/dirty/sink/sdk/InlongSdkDirtySink.java
index 8e692a4c10..daec1c0694 100644
---
a/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/dirty/sink/sdk/InlongSdkDirtySink.java
+++
b/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/dirty/sink/sdk/InlongSdkDirtySink.java
@@ -74,7 +74,7 @@ public class InlongSdkDirtySink<T> implements DirtySink<T> {
.data(dirtyMessage)
.build();
- dirtySender.sendDirtyMessage(wrapper);
+ dirtySender.sendDirtyMessageAsync(wrapper);
} catch (Throwable t) {
log.error("failed to send dirty message to inlong sdk", t);
if (!options.isIgnoreSideOutputErrors()) {