This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new be94dcef55 [INLONG-8629][Agent] Fix sending invalid data to dataproxy
failed blocks normal data sending (#8630)
be94dcef55 is described below
commit be94dcef5597522b352506d86c36ad4a9e502793
Author: justinwwhuang <[email protected]>
AuthorDate: Sun Aug 6 15:16:36 2023 +0800
[INLONG-8629][Agent] Fix sending invalid data to dataproxy failed blocks
normal data sending (#8630)
---
.../inlong/agent/plugin/sinks/SenderManager.java | 19 +++++++++++++++++++
1 file changed, 19 insertions(+)
diff --git
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/SenderManager.java
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/SenderManager.java
index 3447eb0058..554d6f5916 100755
---
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/SenderManager.java
+++
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/SenderManager.java
@@ -168,6 +168,21 @@ public class SenderManager {
shutdown = true;
resendExecutorService.shutdown();
sender.close();
+ cleanResendQueue();
+ }
+
+ private void cleanResendQueue() {
+ while (!resendQueue.isEmpty()) {
+ try {
+ AgentSenderCallback callback = resendQueue.poll(1,
TimeUnit.SECONDS);
+ if (callback != null) {
+ MemoryManager.getInstance()
+ .release(AGENT_GLOBAL_WRITER_PERMIT, (int)
callback.batchMessage.getTotalSize());
+ }
+ } catch (InterruptedException e) {
+ LOGGER.error("clean resend queue error{}", e.getMessage());
+ }
+ }
}
private AgentMetricItem getMetricItem(Map<String, String> otherDimensions)
{
@@ -222,6 +237,10 @@ public class SenderManager {
boolean suc = false;
while (!suc) {
try {
+ if (!resendQueue.isEmpty()) {
+ AgentUtils.silenceSleepInMs(retrySleepTime);
+ continue;
+ }
sender.asyncSendMessage(new AgentSenderCallback(batchMessage,
retry),
batchMessage.getDataList(), batchMessage.getGroupId(),
batchMessage.getStreamId(),
batchMessage.getDataTime(),
SEQUENTIAL_ID.getNextUuid(), maxSenderTimeout, TimeUnit.SECONDS,