This is an automated email from the ASF dual-hosted git repository. luchunliang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push: new 9fb24648e1 [INLONG-11892][Sort] SortStandalone support Negative Acknowledgment mechanism for delivery failures (#11896) 9fb24648e1 is described below commit 9fb24648e12d1983f15db94fd2302d8db8a9fea4 Author: ChunLiang Lu <luchunli...@apache.org> AuthorDate: Wed Jun 18 11:29:04 2025 +0800 [INLONG-11892][Sort] SortStandalone support Negative Acknowledgment mechanism for delivery failures (#11896) * [INLONG-11892][Sort] SortStandalone support Negative Acknowledgment mechanism for delivery failures * set parameter --- .../org/apache/inlong/sdk/sort/api/SortClient.java | 3 ++ .../inlong/sdk/sort/api/SortClientConfig.java | 18 ++++++++ .../apache/inlong/sdk/sort/api/TopicFetcher.java | 7 +++ .../fetcher/kafka/KafkaMultiTopicsFetcher.java | 12 ++++++ .../fetcher/kafka/KafkaSingleTopicFetcher.java | 12 ++++++ .../fetcher/pulsar/PulsarMultiTopicsFetcher.java | 12 ++++++ .../fetcher/pulsar/PulsarSingleTopicFetcher.java | 50 ++++++++++++++++++++++ .../sort/fetcher/tube/TubeSingleTopicFetcher.java | 12 ++++++ .../inlong/sdk/sort/impl/SortClientImpl.java | 15 +++++++ .../inlong/sdk/sort/impl/SortClientImplV2.java | 15 +++++++ .../config/holder/CommonPropertiesHolder.java | 28 ++++++++++++ .../standalone/channel/CacheMessageRecord.java | 13 ++++++ .../sort/standalone/channel/ProfileEvent.java | 37 ++++++++++++++++ .../sink/elasticsearch/EsCallbackListener.java | 16 +++++-- .../sink/kafka/KafkaProducerCluster.java | 15 +++++-- .../standalone/source/sortsdk/SortSdkSource.java | 1 + 16 files changed, 260 insertions(+), 6 deletions(-) diff --git a/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/api/SortClient.java b/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/api/SortClient.java index cb3af2f95d..3161fd4101 100644 --- a/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/api/SortClient.java +++ b/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/api/SortClient.java @@ -27,4 +27,7 @@ public abstract class SortClient { public abstract boolean close(); public abstract SortClientConfig getConfig(); + + public abstract void negativeAck(String msgKey, String msgOffset) + throws Exception; } diff --git a/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/api/SortClientConfig.java b/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/api/SortClientConfig.java index a354b1aced..cfbef57fbd 100644 --- a/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/api/SortClientConfig.java +++ b/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/api/SortClientConfig.java @@ -78,6 +78,8 @@ public class SortClientConfig implements Serializable { private int threadPoolSize = 50; + private int sendFailPauseConsumerMinutes = 10; + public SortClientConfig( String sortTaskId, String sortClusterName, @@ -547,4 +549,20 @@ public class SortClientConfig implements Serializable { return subset; } + /** + * get sendFailPauseConsumerMinutes + * @return the sendFailPauseConsumerMinutes + */ + public int getSendFailPauseConsumerMinutes() { + return sendFailPauseConsumerMinutes; + } + + /** + * set sendFailPauseConsumerMinutes + * @param sendFailPauseConsumerMinutes the sendFailPauseConsumerMinutes to set + */ + public void setSendFailPauseConsumerMinutes(int sendFailPauseConsumerMinutes) { + this.sendFailPauseConsumerMinutes = sendFailPauseConsumerMinutes; + } + } diff --git a/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/api/TopicFetcher.java b/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/api/TopicFetcher.java index 682e678de0..8a762289df 100644 --- a/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/api/TopicFetcher.java +++ b/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/api/TopicFetcher.java @@ -108,4 +108,11 @@ public interface TopicFetcher { * @return The result of update. */ boolean updateTopics(List<InLongTopic> topics); + + /** + * NegativeAck message by the given msgOffset. + * @param msgOffset Offset of message. + * @throws Exception + */ + void negativeAck(String msgOffset) throws Exception; } diff --git a/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/fetcher/kafka/KafkaMultiTopicsFetcher.java b/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/fetcher/kafka/KafkaMultiTopicsFetcher.java index a24c270389..48aa9f9d4e 100644 --- a/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/fetcher/kafka/KafkaMultiTopicsFetcher.java +++ b/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/fetcher/kafka/KafkaMultiTopicsFetcher.java @@ -395,4 +395,16 @@ public class KafkaMultiTopicsFetcher extends MultiTopicsFetcher { } } } + + /** + * negativeAck Offset + * + * @param msgOffset String + */ + @Override + public void negativeAck(String msgOffset) throws Exception { + this.sleepTime = TimeUnit.MILLISECONDS.convert(context.getConfig().getSendFailPauseConsumerMinutes(), + TimeUnit.MINUTES); + LOGGER.error("negativeAck,sleep {} minutes.", this.sleepTime); + } } diff --git a/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/fetcher/kafka/KafkaSingleTopicFetcher.java b/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/fetcher/kafka/KafkaSingleTopicFetcher.java index 1c081a64ef..9ab2a0ebf8 100644 --- a/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/fetcher/kafka/KafkaSingleTopicFetcher.java +++ b/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/fetcher/kafka/KafkaSingleTopicFetcher.java @@ -310,4 +310,16 @@ public class KafkaSingleTopicFetcher extends SingleTopicFetcher { } } } + + /** + * negativeAck Offset + * + * @param msgOffset String + */ + @Override + public void negativeAck(String msgOffset) throws Exception { + this.sleepTime = TimeUnit.MILLISECONDS.convert(context.getConfig().getSendFailPauseConsumerMinutes(), + TimeUnit.MINUTES); + LOGGER.error("negativeAck,topic:{}, sleep {} minutes.", this.topic.getTopicKey(), this.sleepTime); + } } diff --git a/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/fetcher/pulsar/PulsarMultiTopicsFetcher.java b/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/fetcher/pulsar/PulsarMultiTopicsFetcher.java index a346591e18..30601978e4 100644 --- a/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/fetcher/pulsar/PulsarMultiTopicsFetcher.java +++ b/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/fetcher/pulsar/PulsarMultiTopicsFetcher.java @@ -424,4 +424,16 @@ public class PulsarMultiTopicsFetcher extends MultiTopicsFetcher { } } } + + /** + * negativeAck Offset + * + * @param msgOffset String + */ + @Override + public void negativeAck(String msgOffset) throws Exception { + this.sleepTime = TimeUnit.MILLISECONDS.convert(context.getConfig().getSendFailPauseConsumerMinutes(), + TimeUnit.MINUTES); + LOGGER.error("negativeAck,sleep {} minutes.", this.sleepTime); + } } diff --git a/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/fetcher/pulsar/PulsarSingleTopicFetcher.java b/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/fetcher/pulsar/PulsarSingleTopicFetcher.java index 91a4dcf5f1..6d7b8ab1e1 100644 --- a/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/fetcher/pulsar/PulsarSingleTopicFetcher.java +++ b/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/fetcher/pulsar/PulsarSingleTopicFetcher.java @@ -261,6 +261,7 @@ public class PulsarSingleTopicFetcher extends SingleTopicFetcher { if (sleepTime > 0) { TimeUnit.MILLISECONDS.sleep(sleepTime); + consumer.resume(); } context.acquireRequestPermit(); @@ -330,4 +331,53 @@ public class PulsarSingleTopicFetcher extends SingleTopicFetcher { } } } + + /** + * negativeAck Offset + * + * @param msgOffset String + */ + @Override + public void negativeAck(String msgOffset) throws Exception { + if (StringUtils.isEmpty(msgOffset)) { + return; + } + try { + if (consumer == null) { + context.addAckFail(topic, -1); + LOGGER.error("consumer == null {}", topic); + return; + } + MessageId messageId = offsetCache.get(msgOffset); + if (messageId == null) { + context.addAckFail(topic, -1); + LOGGER.error("messageId == null {}", topic); + return; + } + consumer.negativeAcknowledge(messageId); + offsetCache.remove(msgOffset); + context.addAckFail(topic, -1); + this.sleepTime = TimeUnit.MILLISECONDS.convert(context.getConfig().getSendFailPauseConsumerMinutes(), + TimeUnit.MINUTES); + LOGGER.error("negativeAck,topic:{}, sleep {} minutes.", this.topic.getTopicKey(), this.sleepTime); + this.clearConsumer(); + } catch (Exception e) { + context.addAckFail(topic, -1); + LOGGER.error(e.getMessage(), e); + throw e; + } + } + + private void clearConsumer() { + try { + consumer.pause(); + Message<byte[]> message = consumer.receive(1, TimeUnit.MILLISECONDS); + while (message != null) { + consumer.negativeAcknowledge(message); + message = consumer.receive(1, TimeUnit.MILLISECONDS); + } + } catch (Exception e) { + LOGGER.error(e.getMessage(), e); + } + } } diff --git a/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/fetcher/tube/TubeSingleTopicFetcher.java b/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/fetcher/tube/TubeSingleTopicFetcher.java index ae30d1c3d2..05427fc6a5 100644 --- a/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/fetcher/tube/TubeSingleTopicFetcher.java +++ b/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/fetcher/tube/TubeSingleTopicFetcher.java @@ -289,4 +289,16 @@ public class TubeSingleTopicFetcher extends SingleTopicFetcher { } } } + + /** + * negativeAck Offset + * + * @param msgOffset String + */ + @Override + public void negativeAck(String msgOffset) throws Exception { + this.sleepTime = TimeUnit.MILLISECONDS.convert(context.getConfig().getSendFailPauseConsumerMinutes(), + TimeUnit.MINUTES); + LOG.error("negativeAck,topic:{}, sleep {} minutes.", this.topic.getTopicKey(), this.sleepTime); + } } diff --git a/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/impl/SortClientImpl.java b/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/impl/SortClientImpl.java index dd412913da..34a0eead69 100644 --- a/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/impl/SortClientImpl.java +++ b/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/impl/SortClientImpl.java @@ -152,4 +152,19 @@ public class SortClientImpl extends SortClient { return false; } } + + /** + * negativeAck offset to msgKey + * + * @param msgKey String + * @param msgOffset String + * @throws Exception + */ + @Override + public void negativeAck(String msgKey, String msgOffset) + throws Exception { + logger.debug("negativeAck:{} offset:{}", msgKey, msgOffset); + TopicFetcher topicFetcher = getFetcher(msgKey); + topicFetcher.negativeAck(msgOffset); + } } diff --git a/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/impl/SortClientImplV2.java b/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/impl/SortClientImplV2.java index da90b64643..e9e90763d1 100644 --- a/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/impl/SortClientImplV2.java +++ b/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/impl/SortClientImplV2.java @@ -150,4 +150,19 @@ public class SortClientImplV2 extends SortClient { return false; } } + + /** + * negativeAck offset to msgKey + * + * @param msgKey String + * @param msgOffset String + * @throws Exception + */ + @Override + public void negativeAck(String msgKey, String msgOffset) + throws Exception { + logger.debug("negativeAck:{} offset:{}", msgKey, msgOffset); + TopicFetcher topicFetcher = getFetcher(msgKey); + topicFetcher.negativeAck(msgOffset); + } } diff --git a/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/holder/CommonPropertiesHolder.java b/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/holder/CommonPropertiesHolder.java index 3cd897a56c..9bc8bb3a15 100644 --- a/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/holder/CommonPropertiesHolder.java +++ b/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/holder/CommonPropertiesHolder.java @@ -28,6 +28,7 @@ import org.slf4j.Logger; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicInteger; /** * @@ -42,12 +43,20 @@ public class CommonPropertiesHolder { public static final String KEY_SORT_SOURCE_ACKPOLICY = "sortSource.ackPolicy"; public static final String KEY_USE_UNIFIED_CONFIGURATION = "useUnifiedConfiguration"; + public static final String KEY_MAX_SENDFAIL_TIMES = "maxSendFailTimes"; + public static final int DEFAULT_MAX_SENDFAIL_TIMES = 0; + public static final String KEY_SENDFAIL_PAUSE_CONSUMER_MIN = "sendFailPauseConsumerMin"; + public static final int DEFAULT_SENDFAIL_PAUSE_CONSUMER_MIN = 10; + private static Map<String, String> props; private static Context context; private static long auditFormatInterval = 60000L; private static AckPolicy ackPolicy; + private static AtomicInteger maxSendFailTimes = new AtomicInteger(-1); + private static AtomicInteger sendFailPauseConsumerMinutes = new AtomicInteger(-1); + /** * init */ @@ -232,4 +241,23 @@ public class CommonPropertiesHolder { return getBoolean(KEY_USE_UNIFIED_CONFIGURATION, false); } + public static int getMaxSendFailTimes() { + int result = maxSendFailTimes.get(); + if (result >= 0) { + return result; + } + int newResult = getInteger(KEY_MAX_SENDFAIL_TIMES, DEFAULT_MAX_SENDFAIL_TIMES); + maxSendFailTimes.compareAndSet(result, newResult); + return newResult; + } + + public static int getSendFailPauseConsumerMinutes() { + int result = sendFailPauseConsumerMinutes.get(); + if (result >= 0) { + return result; + } + int newResult = getInteger(KEY_SENDFAIL_PAUSE_CONSUMER_MIN, DEFAULT_SENDFAIL_PAUSE_CONSUMER_MIN); + sendFailPauseConsumerMinutes.compareAndSet(result, newResult); + return newResult; + } } diff --git a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/channel/CacheMessageRecord.java b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/channel/CacheMessageRecord.java index c49402714c..11d4022421 100644 --- a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/channel/CacheMessageRecord.java +++ b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/channel/CacheMessageRecord.java @@ -73,6 +73,19 @@ public class CacheMessageRecord { return 0; } + /** + * negativeAck + */ + public void negativeAck() { + if (client != null) { + try { + client.negativeAck(msgKey, offset); + } catch (Exception e) { + LOG.error(e.getMessage(), e); + } + } + } + /** * ackMessage * @param ackToken ackToken diff --git a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/channel/ProfileEvent.java b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/channel/ProfileEvent.java index fd1ac34ed2..6c5fe79b26 100644 --- a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/channel/ProfileEvent.java +++ b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/channel/ProfileEvent.java @@ -25,6 +25,8 @@ import org.apache.commons.lang3.math.NumberUtils; import org.apache.flume.event.SimpleEvent; import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicInteger; /** * @@ -42,6 +44,9 @@ public class ProfileEvent extends SimpleEvent { private CacheMessageRecord cacheRecord; private final int ackToken; + private final ConcurrentHashMap<String, String> headers = new ConcurrentHashMap<>(); + private final AtomicInteger sendedTime = new AtomicInteger(0); + /** * Constructor * @param headers @@ -50,6 +55,7 @@ public class ProfileEvent extends SimpleEvent { public ProfileEvent(Map<String, String> headers, byte[] body) { super.setHeaders(headers); super.setBody(body); + this.headers.putAll(headers); this.inlongGroupId = headers.get(Constants.INLONG_GROUP_ID); this.inlongStreamId = headers.get(Constants.INLONG_STREAM_ID); this.uid = InlongId.generateUid(inlongGroupId, inlongStreamId); @@ -68,6 +74,7 @@ public class ProfileEvent extends SimpleEvent { public ProfileEvent(InLongMessage sdkMessage, CacheMessageRecord cacheRecord) { super.setHeaders(sdkMessage.getParams()); super.setBody(sdkMessage.getBody()); + this.headers.putAll(sdkMessage.getParams()); this.inlongGroupId = sdkMessage.getInlongGroupId(); this.inlongStreamId = sdkMessage.getInlongStreamId(); this.uid = InlongId.generateUid(inlongGroupId, inlongStreamId); @@ -78,6 +85,15 @@ public class ProfileEvent extends SimpleEvent { this.ackToken = cacheRecord.getToken(); } + /** + * get headers + * @return the headers + */ + @Override + public Map<String, String> getHeaders() { + return this.headers; + } + /** * get inlongGroupId * @@ -148,4 +164,25 @@ public class ProfileEvent extends SimpleEvent { cacheRecord.ackMessage(ackToken); } } + + /** + * negativeAck + */ + public void negativeAck() { + if (cacheRecord != null) { + cacheRecord.negativeAck(); + } + } + + /** + * get sendedTime + * @return the sendedTime + */ + public int getSendedTime() { + return sendedTime.get(); + } + + public void incrementSendedTime() { + this.sendedTime.incrementAndGet(); + } } diff --git a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/elasticsearch/EsCallbackListener.java b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/elasticsearch/EsCallbackListener.java index a54c184204..7acc483351 100644 --- a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/elasticsearch/EsCallbackListener.java +++ b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/elasticsearch/EsCallbackListener.java @@ -18,6 +18,7 @@ package org.apache.inlong.sort.standalone.sink.elasticsearch; import org.apache.inlong.sort.standalone.channel.ProfileEvent; +import org.apache.inlong.sort.standalone.config.holder.CommonPropertiesHolder; import org.apache.inlong.sort.standalone.utils.InlongLoggerFactory; import org.elasticsearch.action.DocWriteRequest; @@ -85,7 +86,12 @@ public class EsCallbackListener implements BulkProcessor.Listener { // is fail if (responseItem.isFailed()) { context.addSendResultMetric(event, context.getTaskName(), false, sendTime); - context.backDispatchQueue(requestItem); + event.incrementSendedTime(); + if (event.getSendedTime() <= CommonPropertiesHolder.getMaxSendFailTimes()) { + context.backDispatchQueue(requestItem); + } else { + event.negativeAck(); + } } else { context.addSendResultMetric(event, context.getTaskName(), true, sendTime); context.releaseDispatchQueue(requestItem); @@ -115,8 +121,12 @@ public class EsCallbackListener implements BulkProcessor.Listener { ProfileEvent event = requestItem.getEvent(); long sendTime = requestItem.getSendTime(); context.addSendResultMetric(event, context.getTaskName(), false, sendTime); - context.backDispatchQueue(requestItem); + event.incrementSendedTime(); + if (event.getSendedTime() <= CommonPropertiesHolder.getMaxSendFailTimes()) { + context.backDispatchQueue(requestItem); + } else { + event.negativeAck(); + } } } - } diff --git a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/kafka/KafkaProducerCluster.java b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/kafka/KafkaProducerCluster.java index 72b8ccc5e4..d10d32d02e 100644 --- a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/kafka/KafkaProducerCluster.java +++ b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/kafka/KafkaProducerCluster.java @@ -230,7 +230,7 @@ public class KafkaProducerCluster implements LifecycleAware { tx.commit(); profileEvent.ack(); } else { - tx.rollback(); + this.exceptionProcess(profileEvent, tx); } } else if (ex instanceof UnknownTopicOrPartitionException || !(ex instanceof RetriableException)) { @@ -238,7 +238,7 @@ public class KafkaProducerCluster implements LifecycleAware { tx.commit(); profileEvent.ack(); } else { - tx.rollback(); + this.exceptionProcess(profileEvent, tx); } LOG.error(String.format("send failed, topic is %s", topic), ex); sinkContext.addSendResultMetric(profileEvent, topic, false, sendTime); @@ -247,7 +247,7 @@ public class KafkaProducerCluster implements LifecycleAware { }); return true; } catch (Exception e) { - tx.rollback(); + this.exceptionProcess(profileEvent, tx); tx.close(); LOG.error(e.getMessage(), e); sinkContext.addSendResultMetric(profileEvent, topic, false, sendTime); @@ -255,4 +255,13 @@ public class KafkaProducerCluster implements LifecycleAware { } } + private void exceptionProcess(ProfileEvent profileEvent, Transaction tx) { + profileEvent.incrementSendedTime(); + if (profileEvent.getSendedTime() <= CommonPropertiesHolder.getMaxSendFailTimes()) { + tx.rollback(); + } else { + tx.commit(); + profileEvent.negativeAck(); + } + } } diff --git a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/source/sortsdk/SortSdkSource.java b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/source/sortsdk/SortSdkSource.java index ac4726f241..409f2ebe90 100644 --- a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/source/sortsdk/SortSdkSource.java +++ b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/source/sortsdk/SortSdkSource.java @@ -185,6 +185,7 @@ public final class SortSdkSource extends AbstractSource clientConfig.setCallback(callback); Map<String, String> sortSdkParams = this.getSortClientConfigParameters(); clientConfig.setParameters(sortSdkParams); + clientConfig.setSendFailPauseConsumerMinutes(CommonPropertiesHolder.getSendFailPauseConsumerMinutes()); // create SortClient String configType = CommonPropertiesHolder