Andrew Klopper created KAFKA-9279:
-------------------------------------

             Summary: Silent data loss in Kafka producer
                 Key: KAFKA-9279
                 URL: https://issues.apache.org/jira/browse/KAFKA-9279
             Project: Kafka
          Issue Type: Bug
          Components: producer 
    Affects Versions: 2.3.0
            Reporter: Andrew Klopper


It appears that it is possible for a producer.commitTransaction() call to 
succeed even if an individual producer.send() call has failed. The following 
code demonstrates the issue:
{code:java}
package org.example.dataloss;

import java.nio.charset.StandardCharsets;
import java.util.Properties;
import java.util.Random;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.ByteArraySerializer;

public class Main {

    public static void main(final String[] args) {
        final Properties producerProps = new Properties();

        if (args.length != 2) {
            System.err.println("Invalid command-line arguments");
            System.exit(1);
        }
        final String bootstrapServer = args[0];
        final String topic = args[1];

        producerProps.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, 
bootstrapServer);
        producerProps.setProperty(ProducerConfig.BATCH_SIZE_CONFIG, "500000");
        producerProps.setProperty(ProducerConfig.LINGER_MS_CONFIG, "1000");
        producerProps.setProperty(ProducerConfig.MAX_REQUEST_SIZE_CONFIG, 
"1000000");
        producerProps.setProperty(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, 
"true");
        producerProps.setProperty(ProducerConfig.CLIENT_ID_CONFIG, 
"dataloss_01");
        producerProps.setProperty(ProducerConfig.TRANSACTIONAL_ID_CONFIG, 
"dataloss_01");

        try (final KafkaProducer<byte[], byte[]> producer = new 
KafkaProducer<>(producerProps, new ByteArraySerializer(), new 
ByteArraySerializer())) {
            producer.initTransactions();
            producer.beginTransaction();

            final Random random = new Random();
            final byte[] largePayload = new byte[2000000];
            random.nextBytes(largePayload);
            producer.send(
                new ProducerRecord<>(
                    topic,
                    "large".getBytes(StandardCharsets.UTF_8),
                    largePayload
                ),
                (metadata, e) -> {
                    if (e == null) {
                        System.out.println("INFO: Large payload succeeded");
                    } else {
                        System.err.printf("ERROR: Large payload failed: %s\n", 
e.getMessage());
                    }
                }
            );

            producer.commitTransaction();
            System.out.println("Commit succeeded");

        } catch (final Exception e) {
            System.err.printf("FATAL ERROR: %s", e.getMessage());
        }
    }
}
{code}
The code prints the following output:
{code:java}
ERROR: Large payload failed: The message is 2000093 bytes when serialized which 
is larger than the maximum request size you have configured with the 
max.request.size configuration.
Commit succeeded{code}
 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to