This is an automated email from the ASF dual-hosted git repository.
vernedeng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 9030cf30c0 [INLONG-11658][Sort] Fix the NPE of the Kafka sink error
log for some exceptions without metadata information (#11659)
9030cf30c0 is described below
commit 9030cf30c00dcb1916c4852a2799a065e4a9ff10
Author: vernedeng <[email protected]>
AuthorDate: Fri Jan 10 10:02:37 2025 +0800
[INLONG-11658][Sort] Fix the NPE of the Kafka sink error log for some
exceptions without metadata information (#11659)
* [INLONG-11658][Sort] Fix the NPE of the Kafka sink error log for some
exceptions without metadata information
---
.../standalone/sink/kafka/KafkaProducerCluster.java | 21 ++++++++++++++++++---
1 file changed, 18 insertions(+), 3 deletions(-)
diff --git
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/kafka/KafkaProducerCluster.java
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/kafka/KafkaProducerCluster.java
index c73535a230..5b4f2f9050 100644
---
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/kafka/KafkaProducerCluster.java
+++
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/kafka/KafkaProducerCluster.java
@@ -30,6 +30,7 @@ import org.apache.flume.lifecycle.LifecycleState;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.errors.RecordTooLargeException;
import org.apache.kafka.common.errors.RetriableException;
import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
import org.apache.kafka.common.serialization.ByteArraySerializer;
@@ -62,6 +63,8 @@ public class KafkaProducerCluster implements LifecycleAware {
private KafkaProducer<String, byte[]> producer;
+ private long configuredMaxPayloadSize = 8388608L;
+
public KafkaProducerCluster(
String workerName,
CacheClusterConfig cacheClusterConfig,
@@ -125,6 +128,7 @@ public class KafkaProducerCluster implements LifecycleAware
{
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
nodeConfig.getBootstrapServers());
props.put(ProducerConfig.CLIENT_ID_CONFIG,
nodeConfig.getClientId() + "-" + workerName);
LOG.info("init kafka client by node config info: " + props);
+ configuredMaxPayloadSize =
Long.parseLong(props.getProperty(ProducerConfig.MAX_REQUEST_SIZE_CONFIG));
producer = new KafkaProducer<>(props, new StringSerializer(), new
ByteArraySerializer());
Preconditions.checkNotNull(producer);
} catch (Exception e) {
@@ -217,14 +221,25 @@ public class KafkaProducerCluster implements
LifecycleAware {
sinkContext.addSendResultMetric(profileEvent,
topic, true, sendTime);
profileEvent.ack();
} else {
- if (ex instanceof UnknownTopicOrPartitionException
+
+ if (ex instanceof RecordTooLargeException) {
+ // for the message bigger than
configuredMaxPayloadSize, just discard it;
+ // otherwise, retry and wait for the server
side changes the limitation
+ if (record.value().length >
configuredMaxPayloadSize) {
+ tx.commit();
+ profileEvent.ack();
+ } else {
+ tx.rollback();
+ }
+ } else if (ex instanceof
UnknownTopicOrPartitionException
|| !(ex instanceof RetriableException)) {
+ // for non-retriable exception, just discard it
tx.commit();
+ profileEvent.ack();
} else {
tx.rollback();
}
- LOG.error(String.format("send failed, topic is %s,
partition is %s",
- metadata.topic(), metadata.partition()),
ex);
+ LOG.error(String.format("send failed, topic is
%s", topic), ex);
sinkContext.addSendResultMetric(profileEvent,
topic, false, sendTime);
}
tx.close();