This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new be94dcef55 [INLONG-8629][Agent] Fix sending invalid data to dataproxy 
failed blocks normal data sending (#8630)
be94dcef55 is described below

commit be94dcef5597522b352506d86c36ad4a9e502793
Author: justinwwhuang <[email protected]>
AuthorDate: Sun Aug 6 15:16:36 2023 +0800

    [INLONG-8629][Agent] Fix sending invalid data to dataproxy failed blocks 
normal data sending (#8630)
---
 .../inlong/agent/plugin/sinks/SenderManager.java      | 19 +++++++++++++++++++
 1 file changed, 19 insertions(+)

diff --git 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/SenderManager.java
 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/SenderManager.java
index 3447eb0058..554d6f5916 100755
--- 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/SenderManager.java
+++ 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/SenderManager.java
@@ -168,6 +168,21 @@ public class SenderManager {
         shutdown = true;
         resendExecutorService.shutdown();
         sender.close();
+        cleanResendQueue();
+    }
+
+    private void cleanResendQueue() {
+        while (!resendQueue.isEmpty()) {
+            try {
+                AgentSenderCallback callback = resendQueue.poll(1, 
TimeUnit.SECONDS);
+                if (callback != null) {
+                    MemoryManager.getInstance()
+                            .release(AGENT_GLOBAL_WRITER_PERMIT, (int) 
callback.batchMessage.getTotalSize());
+                }
+            } catch (InterruptedException e) {
+                LOGGER.error("clean resend queue error{}", e.getMessage());
+            }
+        }
     }
 
     private AgentMetricItem getMetricItem(Map<String, String> otherDimensions) 
{
@@ -222,6 +237,10 @@ public class SenderManager {
         boolean suc = false;
         while (!suc) {
             try {
+                if (!resendQueue.isEmpty()) {
+                    AgentUtils.silenceSleepInMs(retrySleepTime);
+                    continue;
+                }
                 sender.asyncSendMessage(new AgentSenderCallback(batchMessage, 
retry),
                         batchMessage.getDataList(), batchMessage.getGroupId(), 
batchMessage.getStreamId(),
                         batchMessage.getDataTime(), 
SEQUENTIAL_ID.getNextUuid(), maxSenderTimeout, TimeUnit.SECONDS,

Reply via email to