This is an automated email from the ASF dual-hosted git repository.

luchunliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 9fb24648e1 [INLONG-11892][Sort] SortStandalone support Negative 
Acknowledgment mechanism for delivery failures (#11896)
9fb24648e1 is described below

commit 9fb24648e12d1983f15db94fd2302d8db8a9fea4
Author: ChunLiang Lu <luchunli...@apache.org>
AuthorDate: Wed Jun 18 11:29:04 2025 +0800

    [INLONG-11892][Sort] SortStandalone support Negative Acknowledgment 
mechanism for delivery failures (#11896)
    
    * [INLONG-11892][Sort] SortStandalone support Negative Acknowledgment 
mechanism for delivery failures
    
    * set parameter
---
 .../org/apache/inlong/sdk/sort/api/SortClient.java |  3 ++
 .../inlong/sdk/sort/api/SortClientConfig.java      | 18 ++++++++
 .../apache/inlong/sdk/sort/api/TopicFetcher.java   |  7 +++
 .../fetcher/kafka/KafkaMultiTopicsFetcher.java     | 12 ++++++
 .../fetcher/kafka/KafkaSingleTopicFetcher.java     | 12 ++++++
 .../fetcher/pulsar/PulsarMultiTopicsFetcher.java   | 12 ++++++
 .../fetcher/pulsar/PulsarSingleTopicFetcher.java   | 50 ++++++++++++++++++++++
 .../sort/fetcher/tube/TubeSingleTopicFetcher.java  | 12 ++++++
 .../inlong/sdk/sort/impl/SortClientImpl.java       | 15 +++++++
 .../inlong/sdk/sort/impl/SortClientImplV2.java     | 15 +++++++
 .../config/holder/CommonPropertiesHolder.java      | 28 ++++++++++++
 .../standalone/channel/CacheMessageRecord.java     | 13 ++++++
 .../sort/standalone/channel/ProfileEvent.java      | 37 ++++++++++++++++
 .../sink/elasticsearch/EsCallbackListener.java     | 16 +++++--
 .../sink/kafka/KafkaProducerCluster.java           | 15 +++++--
 .../standalone/source/sortsdk/SortSdkSource.java   |  1 +
 16 files changed, 260 insertions(+), 6 deletions(-)

diff --git 
a/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/api/SortClient.java
 
b/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/api/SortClient.java
index cb3af2f95d..3161fd4101 100644
--- 
a/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/api/SortClient.java
+++ 
b/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/api/SortClient.java
@@ -27,4 +27,7 @@ public abstract class SortClient {
     public abstract boolean close();
 
     public abstract SortClientConfig getConfig();
+
+    public abstract void negativeAck(String msgKey, String msgOffset)
+            throws Exception;
 }
diff --git 
a/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/api/SortClientConfig.java
 
b/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/api/SortClientConfig.java
index a354b1aced..cfbef57fbd 100644
--- 
a/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/api/SortClientConfig.java
+++ 
b/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/api/SortClientConfig.java
@@ -78,6 +78,8 @@ public class SortClientConfig implements Serializable {
 
     private int threadPoolSize = 50;
 
+    private int sendFailPauseConsumerMinutes = 10;
+
     public SortClientConfig(
             String sortTaskId,
             String sortClusterName,
@@ -547,4 +549,20 @@ public class SortClientConfig implements Serializable {
         return subset;
     }
 
+    /**
+     * get sendFailPauseConsumerMinutes
+     * @return the sendFailPauseConsumerMinutes
+     */
+    public int getSendFailPauseConsumerMinutes() {
+        return sendFailPauseConsumerMinutes;
+    }
+
+    /**
+     * set sendFailPauseConsumerMinutes
+     * @param sendFailPauseConsumerMinutes the sendFailPauseConsumerMinutes to 
set
+     */
+    public void setSendFailPauseConsumerMinutes(int 
sendFailPauseConsumerMinutes) {
+        this.sendFailPauseConsumerMinutes = sendFailPauseConsumerMinutes;
+    }
+
 }
diff --git 
a/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/api/TopicFetcher.java
 
b/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/api/TopicFetcher.java
index 682e678de0..8a762289df 100644
--- 
a/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/api/TopicFetcher.java
+++ 
b/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/api/TopicFetcher.java
@@ -108,4 +108,11 @@ public interface TopicFetcher {
      * @return The result of update.
      */
     boolean updateTopics(List<InLongTopic> topics);
+
+    /**
+     * NegativeAck message by the given msgOffset.
+     * @param msgOffset Offset of message.
+     * @throws Exception
+     */
+    void negativeAck(String msgOffset) throws Exception;
 }
diff --git 
a/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/fetcher/kafka/KafkaMultiTopicsFetcher.java
 
b/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/fetcher/kafka/KafkaMultiTopicsFetcher.java
index a24c270389..48aa9f9d4e 100644
--- 
a/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/fetcher/kafka/KafkaMultiTopicsFetcher.java
+++ 
b/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/fetcher/kafka/KafkaMultiTopicsFetcher.java
@@ -395,4 +395,16 @@ public class KafkaMultiTopicsFetcher extends 
MultiTopicsFetcher {
             }
         }
     }
+
+    /**
+     * negativeAck Offset
+     *
+     * @param msgOffset String
+     */
+    @Override
+    public void negativeAck(String msgOffset) throws Exception {
+        this.sleepTime = 
TimeUnit.MILLISECONDS.convert(context.getConfig().getSendFailPauseConsumerMinutes(),
+                TimeUnit.MINUTES);
+        LOGGER.error("negativeAck,sleep {} minutes.", this.sleepTime);
+    }
 }
diff --git 
a/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/fetcher/kafka/KafkaSingleTopicFetcher.java
 
b/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/fetcher/kafka/KafkaSingleTopicFetcher.java
index 1c081a64ef..9ab2a0ebf8 100644
--- 
a/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/fetcher/kafka/KafkaSingleTopicFetcher.java
+++ 
b/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/fetcher/kafka/KafkaSingleTopicFetcher.java
@@ -310,4 +310,16 @@ public class KafkaSingleTopicFetcher extends 
SingleTopicFetcher {
             }
         }
     }
+
+    /**
+     * negativeAck Offset
+     *
+     * @param msgOffset String
+     */
+    @Override
+    public void negativeAck(String msgOffset) throws Exception {
+        this.sleepTime = 
TimeUnit.MILLISECONDS.convert(context.getConfig().getSendFailPauseConsumerMinutes(),
+                TimeUnit.MINUTES);
+        LOGGER.error("negativeAck,topic:{}, sleep {} minutes.", 
this.topic.getTopicKey(), this.sleepTime);
+    }
 }
diff --git 
a/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/fetcher/pulsar/PulsarMultiTopicsFetcher.java
 
b/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/fetcher/pulsar/PulsarMultiTopicsFetcher.java
index a346591e18..30601978e4 100644
--- 
a/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/fetcher/pulsar/PulsarMultiTopicsFetcher.java
+++ 
b/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/fetcher/pulsar/PulsarMultiTopicsFetcher.java
@@ -424,4 +424,16 @@ public class PulsarMultiTopicsFetcher extends 
MultiTopicsFetcher {
             }
         }
     }
+
+    /**
+     * negativeAck Offset
+     *
+     * @param msgOffset String
+     */
+    @Override
+    public void negativeAck(String msgOffset) throws Exception {
+        this.sleepTime = 
TimeUnit.MILLISECONDS.convert(context.getConfig().getSendFailPauseConsumerMinutes(),
+                TimeUnit.MINUTES);
+        LOGGER.error("negativeAck,sleep {} minutes.", this.sleepTime);
+    }
 }
diff --git 
a/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/fetcher/pulsar/PulsarSingleTopicFetcher.java
 
b/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/fetcher/pulsar/PulsarSingleTopicFetcher.java
index 91a4dcf5f1..6d7b8ab1e1 100644
--- 
a/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/fetcher/pulsar/PulsarSingleTopicFetcher.java
+++ 
b/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/fetcher/pulsar/PulsarSingleTopicFetcher.java
@@ -261,6 +261,7 @@ public class PulsarSingleTopicFetcher extends 
SingleTopicFetcher {
 
                         if (sleepTime > 0) {
                             TimeUnit.MILLISECONDS.sleep(sleepTime);
+                            consumer.resume();
                         }
 
                         context.acquireRequestPermit();
@@ -330,4 +331,53 @@ public class PulsarSingleTopicFetcher extends 
SingleTopicFetcher {
             }
         }
     }
+
+    /**
+     * negativeAck Offset
+     *
+     * @param msgOffset String
+     */
+    @Override
+    public void negativeAck(String msgOffset) throws Exception {
+        if (StringUtils.isEmpty(msgOffset)) {
+            return;
+        }
+        try {
+            if (consumer == null) {
+                context.addAckFail(topic, -1);
+                LOGGER.error("consumer == null {}", topic);
+                return;
+            }
+            MessageId messageId = offsetCache.get(msgOffset);
+            if (messageId == null) {
+                context.addAckFail(topic, -1);
+                LOGGER.error("messageId == null {}", topic);
+                return;
+            }
+            consumer.negativeAcknowledge(messageId);
+            offsetCache.remove(msgOffset);
+            context.addAckFail(topic, -1);
+            this.sleepTime = 
TimeUnit.MILLISECONDS.convert(context.getConfig().getSendFailPauseConsumerMinutes(),
+                    TimeUnit.MINUTES);
+            LOGGER.error("negativeAck,topic:{}, sleep {} minutes.", 
this.topic.getTopicKey(), this.sleepTime);
+            this.clearConsumer();
+        } catch (Exception e) {
+            context.addAckFail(topic, -1);
+            LOGGER.error(e.getMessage(), e);
+            throw e;
+        }
+    }
+
+    private void clearConsumer() {
+        try {
+            consumer.pause();
+            Message<byte[]> message = consumer.receive(1, 
TimeUnit.MILLISECONDS);
+            while (message != null) {
+                consumer.negativeAcknowledge(message);
+                message = consumer.receive(1, TimeUnit.MILLISECONDS);
+            }
+        } catch (Exception e) {
+            LOGGER.error(e.getMessage(), e);
+        }
+    }
 }
diff --git 
a/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/fetcher/tube/TubeSingleTopicFetcher.java
 
b/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/fetcher/tube/TubeSingleTopicFetcher.java
index ae30d1c3d2..05427fc6a5 100644
--- 
a/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/fetcher/tube/TubeSingleTopicFetcher.java
+++ 
b/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/fetcher/tube/TubeSingleTopicFetcher.java
@@ -289,4 +289,16 @@ public class TubeSingleTopicFetcher extends 
SingleTopicFetcher {
             }
         }
     }
+
+    /**
+     * negativeAck Offset
+     *
+     * @param msgOffset String
+     */
+    @Override
+    public void negativeAck(String msgOffset) throws Exception {
+        this.sleepTime = 
TimeUnit.MILLISECONDS.convert(context.getConfig().getSendFailPauseConsumerMinutes(),
+                TimeUnit.MINUTES);
+        LOG.error("negativeAck,topic:{}, sleep {} minutes.", 
this.topic.getTopicKey(), this.sleepTime);
+    }
 }
diff --git 
a/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/impl/SortClientImpl.java
 
b/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/impl/SortClientImpl.java
index dd412913da..34a0eead69 100644
--- 
a/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/impl/SortClientImpl.java
+++ 
b/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/impl/SortClientImpl.java
@@ -152,4 +152,19 @@ public class SortClientImpl extends SortClient {
             return false;
         }
     }
+
+    /**
+     * negativeAck offset to msgKey
+     *
+     * @param msgKey String
+     * @param msgOffset String
+     * @throws Exception
+     */
+    @Override
+    public void negativeAck(String msgKey, String msgOffset)
+            throws Exception {
+        logger.debug("negativeAck:{} offset:{}", msgKey, msgOffset);
+        TopicFetcher topicFetcher = getFetcher(msgKey);
+        topicFetcher.negativeAck(msgOffset);
+    }
 }
diff --git 
a/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/impl/SortClientImplV2.java
 
b/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/impl/SortClientImplV2.java
index da90b64643..e9e90763d1 100644
--- 
a/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/impl/SortClientImplV2.java
+++ 
b/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/impl/SortClientImplV2.java
@@ -150,4 +150,19 @@ public class SortClientImplV2 extends SortClient {
             return false;
         }
     }
+
+    /**
+     * negativeAck offset to msgKey
+     *
+     * @param msgKey String
+     * @param msgOffset String
+     * @throws Exception
+     */
+    @Override
+    public void negativeAck(String msgKey, String msgOffset)
+            throws Exception {
+        logger.debug("negativeAck:{} offset:{}", msgKey, msgOffset);
+        TopicFetcher topicFetcher = getFetcher(msgKey);
+        topicFetcher.negativeAck(msgOffset);
+    }
 }
diff --git 
a/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/holder/CommonPropertiesHolder.java
 
b/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/holder/CommonPropertiesHolder.java
index 3cd897a56c..9bc8bb3a15 100644
--- 
a/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/holder/CommonPropertiesHolder.java
+++ 
b/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/holder/CommonPropertiesHolder.java
@@ -28,6 +28,7 @@ import org.slf4j.Logger;
 
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicInteger;
 
 /**
  * 
@@ -42,12 +43,20 @@ public class CommonPropertiesHolder {
     public static final String KEY_SORT_SOURCE_ACKPOLICY = 
"sortSource.ackPolicy";
     public static final String KEY_USE_UNIFIED_CONFIGURATION = 
"useUnifiedConfiguration";
 
+    public static final String KEY_MAX_SENDFAIL_TIMES = "maxSendFailTimes";
+    public static final int DEFAULT_MAX_SENDFAIL_TIMES = 0;
+    public static final String KEY_SENDFAIL_PAUSE_CONSUMER_MIN = 
"sendFailPauseConsumerMin";
+    public static final int DEFAULT_SENDFAIL_PAUSE_CONSUMER_MIN = 10;
+
     private static Map<String, String> props;
     private static Context context;
 
     private static long auditFormatInterval = 60000L;
     private static AckPolicy ackPolicy;
 
+    private static AtomicInteger maxSendFailTimes = new AtomicInteger(-1);
+    private static AtomicInteger sendFailPauseConsumerMinutes = new 
AtomicInteger(-1);
+
     /**
      * init
      */
@@ -232,4 +241,23 @@ public class CommonPropertiesHolder {
         return getBoolean(KEY_USE_UNIFIED_CONFIGURATION, false);
     }
 
+    public static int getMaxSendFailTimes() {
+        int result = maxSendFailTimes.get();
+        if (result >= 0) {
+            return result;
+        }
+        int newResult = getInteger(KEY_MAX_SENDFAIL_TIMES, 
DEFAULT_MAX_SENDFAIL_TIMES);
+        maxSendFailTimes.compareAndSet(result, newResult);
+        return newResult;
+    }
+
+    public static int getSendFailPauseConsumerMinutes() {
+        int result = sendFailPauseConsumerMinutes.get();
+        if (result >= 0) {
+            return result;
+        }
+        int newResult = getInteger(KEY_SENDFAIL_PAUSE_CONSUMER_MIN, 
DEFAULT_SENDFAIL_PAUSE_CONSUMER_MIN);
+        sendFailPauseConsumerMinutes.compareAndSet(result, newResult);
+        return newResult;
+    }
 }
diff --git 
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/channel/CacheMessageRecord.java
 
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/channel/CacheMessageRecord.java
index c49402714c..11d4022421 100644
--- 
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/channel/CacheMessageRecord.java
+++ 
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/channel/CacheMessageRecord.java
@@ -73,6 +73,19 @@ public class CacheMessageRecord {
         return 0;
     }
 
+    /**
+     * negativeAck
+     */
+    public void negativeAck() {
+        if (client != null) {
+            try {
+                client.negativeAck(msgKey, offset);
+            } catch (Exception e) {
+                LOG.error(e.getMessage(), e);
+            }
+        }
+    }
+
     /**
      * ackMessage
      * @param ackToken ackToken
diff --git 
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/channel/ProfileEvent.java
 
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/channel/ProfileEvent.java
index fd1ac34ed2..6c5fe79b26 100644
--- 
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/channel/ProfileEvent.java
+++ 
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/channel/ProfileEvent.java
@@ -25,6 +25,8 @@ import org.apache.commons.lang3.math.NumberUtils;
 import org.apache.flume.event.SimpleEvent;
 
 import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicInteger;
 
 /**
  * 
@@ -42,6 +44,9 @@ public class ProfileEvent extends SimpleEvent {
     private CacheMessageRecord cacheRecord;
     private final int ackToken;
 
+    private final ConcurrentHashMap<String, String> headers = new 
ConcurrentHashMap<>();
+    private final AtomicInteger sendedTime = new AtomicInteger(0);
+
     /**
      * Constructor
      * @param headers
@@ -50,6 +55,7 @@ public class ProfileEvent extends SimpleEvent {
     public ProfileEvent(Map<String, String> headers, byte[] body) {
         super.setHeaders(headers);
         super.setBody(body);
+        this.headers.putAll(headers);
         this.inlongGroupId = headers.get(Constants.INLONG_GROUP_ID);
         this.inlongStreamId = headers.get(Constants.INLONG_STREAM_ID);
         this.uid = InlongId.generateUid(inlongGroupId, inlongStreamId);
@@ -68,6 +74,7 @@ public class ProfileEvent extends SimpleEvent {
     public ProfileEvent(InLongMessage sdkMessage, CacheMessageRecord 
cacheRecord) {
         super.setHeaders(sdkMessage.getParams());
         super.setBody(sdkMessage.getBody());
+        this.headers.putAll(sdkMessage.getParams());
         this.inlongGroupId = sdkMessage.getInlongGroupId();
         this.inlongStreamId = sdkMessage.getInlongStreamId();
         this.uid = InlongId.generateUid(inlongGroupId, inlongStreamId);
@@ -78,6 +85,15 @@ public class ProfileEvent extends SimpleEvent {
         this.ackToken = cacheRecord.getToken();
     }
 
+    /**
+     * get headers
+     * @return the headers
+     */
+    @Override
+    public Map<String, String> getHeaders() {
+        return this.headers;
+    }
+
     /**
      * get inlongGroupId
      * 
@@ -148,4 +164,25 @@ public class ProfileEvent extends SimpleEvent {
             cacheRecord.ackMessage(ackToken);
         }
     }
+
+    /**
+     * negativeAck
+     */
+    public void negativeAck() {
+        if (cacheRecord != null) {
+            cacheRecord.negativeAck();
+        }
+    }
+
+    /**
+     * get sendedTime
+     * @return the sendedTime
+     */
+    public int getSendedTime() {
+        return sendedTime.get();
+    }
+
+    public void incrementSendedTime() {
+        this.sendedTime.incrementAndGet();
+    }
 }
diff --git 
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/elasticsearch/EsCallbackListener.java
 
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/elasticsearch/EsCallbackListener.java
index a54c184204..7acc483351 100644
--- 
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/elasticsearch/EsCallbackListener.java
+++ 
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/elasticsearch/EsCallbackListener.java
@@ -18,6 +18,7 @@
 package org.apache.inlong.sort.standalone.sink.elasticsearch;
 
 import org.apache.inlong.sort.standalone.channel.ProfileEvent;
+import org.apache.inlong.sort.standalone.config.holder.CommonPropertiesHolder;
 import org.apache.inlong.sort.standalone.utils.InlongLoggerFactory;
 
 import org.elasticsearch.action.DocWriteRequest;
@@ -85,7 +86,12 @@ public class EsCallbackListener implements 
BulkProcessor.Listener {
             // is fail
             if (responseItem.isFailed()) {
                 context.addSendResultMetric(event, context.getTaskName(), 
false, sendTime);
-                context.backDispatchQueue(requestItem);
+                event.incrementSendedTime();
+                if (event.getSendedTime() <= 
CommonPropertiesHolder.getMaxSendFailTimes()) {
+                    context.backDispatchQueue(requestItem);
+                } else {
+                    event.negativeAck();
+                }
             } else {
                 context.addSendResultMetric(event, context.getTaskName(), 
true, sendTime);
                 context.releaseDispatchQueue(requestItem);
@@ -115,8 +121,12 @@ public class EsCallbackListener implements 
BulkProcessor.Listener {
             ProfileEvent event = requestItem.getEvent();
             long sendTime = requestItem.getSendTime();
             context.addSendResultMetric(event, context.getTaskName(), false, 
sendTime);
-            context.backDispatchQueue(requestItem);
+            event.incrementSendedTime();
+            if (event.getSendedTime() <= 
CommonPropertiesHolder.getMaxSendFailTimes()) {
+                context.backDispatchQueue(requestItem);
+            } else {
+                event.negativeAck();
+            }
         }
     }
-
 }
diff --git 
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/kafka/KafkaProducerCluster.java
 
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/kafka/KafkaProducerCluster.java
index 72b8ccc5e4..d10d32d02e 100644
--- 
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/kafka/KafkaProducerCluster.java
+++ 
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/kafka/KafkaProducerCluster.java
@@ -230,7 +230,7 @@ public class KafkaProducerCluster implements LifecycleAware 
{
                                     tx.commit();
                                     profileEvent.ack();
                                 } else {
-                                    tx.rollback();
+                                    this.exceptionProcess(profileEvent, tx);
                                 }
                             } else if (ex instanceof 
UnknownTopicOrPartitionException
                                     || !(ex instanceof RetriableException)) {
@@ -238,7 +238,7 @@ public class KafkaProducerCluster implements LifecycleAware 
{
                                 tx.commit();
                                 profileEvent.ack();
                             } else {
-                                tx.rollback();
+                                this.exceptionProcess(profileEvent, tx);
                             }
                             LOG.error(String.format("send failed, topic is 
%s", topic), ex);
                             sinkContext.addSendResultMetric(profileEvent, 
topic, false, sendTime);
@@ -247,7 +247,7 @@ public class KafkaProducerCluster implements LifecycleAware 
{
                     });
             return true;
         } catch (Exception e) {
-            tx.rollback();
+            this.exceptionProcess(profileEvent, tx);
             tx.close();
             LOG.error(e.getMessage(), e);
             sinkContext.addSendResultMetric(profileEvent, topic, false, 
sendTime);
@@ -255,4 +255,13 @@ public class KafkaProducerCluster implements 
LifecycleAware {
         }
     }
 
+    private void exceptionProcess(ProfileEvent profileEvent, Transaction tx) {
+        profileEvent.incrementSendedTime();
+        if (profileEvent.getSendedTime() <= 
CommonPropertiesHolder.getMaxSendFailTimes()) {
+            tx.rollback();
+        } else {
+            tx.commit();
+            profileEvent.negativeAck();
+        }
+    }
 }
diff --git 
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/source/sortsdk/SortSdkSource.java
 
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/source/sortsdk/SortSdkSource.java
index ac4726f241..409f2ebe90 100644
--- 
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/source/sortsdk/SortSdkSource.java
+++ 
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/source/sortsdk/SortSdkSource.java
@@ -185,6 +185,7 @@ public final class SortSdkSource extends AbstractSource
             clientConfig.setCallback(callback);
             Map<String, String> sortSdkParams = 
this.getSortClientConfigParameters();
             clientConfig.setParameters(sortSdkParams);
+            
clientConfig.setSendFailPauseConsumerMinutes(CommonPropertiesHolder.getSendFailPauseConsumerMinutes());
 
             // create SortClient
             String configType = CommonPropertiesHolder

Reply via email to