This is an automated email from the ASF dual-hosted git repository.

vernedeng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 9030cf30c0 [INLONG-11658][Sort] Fix the NPE of the Kafka sink error 
log for some exceptions without metadata information (#11659)
9030cf30c0 is described below

commit 9030cf30c00dcb1916c4852a2799a065e4a9ff10
Author: vernedeng <[email protected]>
AuthorDate: Fri Jan 10 10:02:37 2025 +0800

    [INLONG-11658][Sort] Fix the NPE of the Kafka sink error log for some 
exceptions without metadata information (#11659)
    
    * [INLONG-11658][Sort] Fix the NPE of the Kafka sink error log for some 
exceptions without metadata information
---
 .../standalone/sink/kafka/KafkaProducerCluster.java | 21 ++++++++++++++++++---
 1 file changed, 18 insertions(+), 3 deletions(-)

diff --git 
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/kafka/KafkaProducerCluster.java
 
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/kafka/KafkaProducerCluster.java
index c73535a230..5b4f2f9050 100644
--- 
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/kafka/KafkaProducerCluster.java
+++ 
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/kafka/KafkaProducerCluster.java
@@ -30,6 +30,7 @@ import org.apache.flume.lifecycle.LifecycleState;
 import org.apache.kafka.clients.producer.KafkaProducer;
 import org.apache.kafka.clients.producer.ProducerConfig;
 import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.errors.RecordTooLargeException;
 import org.apache.kafka.common.errors.RetriableException;
 import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
 import org.apache.kafka.common.serialization.ByteArraySerializer;
@@ -62,6 +63,8 @@ public class KafkaProducerCluster implements LifecycleAware {
 
     private KafkaProducer<String, byte[]> producer;
 
+    private long configuredMaxPayloadSize = 8388608L;
+
     public KafkaProducerCluster(
             String workerName,
             CacheClusterConfig cacheClusterConfig,
@@ -125,6 +128,7 @@ public class KafkaProducerCluster implements LifecycleAware 
{
             props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, 
nodeConfig.getBootstrapServers());
             props.put(ProducerConfig.CLIENT_ID_CONFIG, 
nodeConfig.getClientId() + "-" + workerName);
             LOG.info("init kafka client by node config info: " + props);
+            configuredMaxPayloadSize = 
Long.parseLong(props.getProperty(ProducerConfig.MAX_REQUEST_SIZE_CONFIG));
             producer = new KafkaProducer<>(props, new StringSerializer(), new 
ByteArraySerializer());
             Preconditions.checkNotNull(producer);
         } catch (Exception e) {
@@ -217,14 +221,25 @@ public class KafkaProducerCluster implements 
LifecycleAware {
                             sinkContext.addSendResultMetric(profileEvent, 
topic, true, sendTime);
                             profileEvent.ack();
                         } else {
-                            if (ex instanceof UnknownTopicOrPartitionException
+
+                            if (ex instanceof RecordTooLargeException) {
+                                // for the message bigger than 
configuredMaxPayloadSize, just discard it;
+                                // otherwise, retry and wait for the server 
side changes the limitation
+                                if (record.value().length > 
configuredMaxPayloadSize) {
+                                    tx.commit();
+                                    profileEvent.ack();
+                                } else {
+                                    tx.rollback();
+                                }
+                            } else if (ex instanceof 
UnknownTopicOrPartitionException
                                     || !(ex instanceof RetriableException)) {
+                                // for non-retriable exception, just discard it
                                 tx.commit();
+                                profileEvent.ack();
                             } else {
                                 tx.rollback();
                             }
-                            LOG.error(String.format("send failed, topic is %s, 
partition is %s",
-                                    metadata.topic(), metadata.partition()), 
ex);
+                            LOG.error(String.format("send failed, topic is 
%s", topic), ex);
                             sinkContext.addSendResultMetric(profileEvent, 
topic, false, sendTime);
                         }
                         tx.close();

Reply via email to