Andrew Klopper created KAFKA-9279: ------------------------------------- Summary: Silent data loss in Kafka producer Key: KAFKA-9279 URL: https://issues.apache.org/jira/browse/KAFKA-9279 Project: Kafka Issue Type: Bug Components: producer Affects Versions: 2.3.0 Reporter: Andrew Klopper
It appears that it is possible for a producer.commitTransaction() call to succeed even if an individual producer.send() call has failed. The following code demonstrates the issue: {code:java} package org.example.dataloss; import java.nio.charset.StandardCharsets; import java.util.Properties; import java.util.Random; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.serialization.ByteArraySerializer; public class Main { public static void main(final String[] args) { final Properties producerProps = new Properties(); if (args.length != 2) { System.err.println("Invalid command-line arguments"); System.exit(1); } final String bootstrapServer = args[0]; final String topic = args[1]; producerProps.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServer); producerProps.setProperty(ProducerConfig.BATCH_SIZE_CONFIG, "500000"); producerProps.setProperty(ProducerConfig.LINGER_MS_CONFIG, "1000"); producerProps.setProperty(ProducerConfig.MAX_REQUEST_SIZE_CONFIG, "1000000"); producerProps.setProperty(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true"); producerProps.setProperty(ProducerConfig.CLIENT_ID_CONFIG, "dataloss_01"); producerProps.setProperty(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "dataloss_01"); try (final KafkaProducer<byte[], byte[]> producer = new KafkaProducer<>(producerProps, new ByteArraySerializer(), new ByteArraySerializer())) { producer.initTransactions(); producer.beginTransaction(); final Random random = new Random(); final byte[] largePayload = new byte[2000000]; random.nextBytes(largePayload); producer.send( new ProducerRecord<>( topic, "large".getBytes(StandardCharsets.UTF_8), largePayload ), (metadata, e) -> { if (e == null) { System.out.println("INFO: Large payload succeeded"); } else { System.err.printf("ERROR: Large payload failed: %s\n", e.getMessage()); } } ); producer.commitTransaction(); System.out.println("Commit succeeded"); } catch (final Exception e) { System.err.printf("FATAL ERROR: %s", e.getMessage()); } } } {code} The code prints the following output: {code:java} ERROR: Large payload failed: The message is 2000093 bytes when serialized which is larger than the maximum request size you have configured with the max.request.size configuration. Commit succeeded{code} -- This message was sent by Atlassian Jira (v8.3.4#803005)